Skip to content

Commit

Permalink
Add seconds_running and seconds_since_queued to condition expression …
Browse files Browse the repository at this point in the history
…langauge.
  • Loading branch information
jmchilton committed Dec 14, 2016
1 parent 325b035 commit 18eb1c8
Show file tree
Hide file tree
Showing 7 changed files with 81 additions and 28 deletions.
39 changes: 24 additions & 15 deletions lib/galaxy/jobs/runners/local.py
Expand Up @@ -77,18 +77,9 @@ def __command_line( self, job_wrapper ):
return job_file, exit_code_path

def queue_job( self, job_wrapper ):
# prepare the job
job_destination = job_wrapper.job_destination
include_metadata = asbool( job_destination.params.get( "embed_metadata_in_job", DEFAULT_EMBED_METADATA_IN_JOB ) )
if not self.prepare_job( job_wrapper, include_metadata=include_metadata ):
if not self._prepare_job_local( job_wrapper ):
return

def fail(message):
job_state = JobState(job_wrapper, job_destination)
job_state.fail_message = message
job_state.stop_job = False
self.fail_job(job_state, exception=True)

stderr = stdout = ''
exit_code = 0

Expand All @@ -107,7 +98,7 @@ def fail(message):
stderr=stderr_file,
env=self._environ,
preexec_fn=os.setpgrp )
job_wrapper.set_job_destination(job_destination, proc.pid)
job_wrapper.set_job_destination(job_wrapper.job_destination, proc.pid)
job_wrapper.change_state( model.Job.states.RUNNING )

terminated = self.__poll_if_needed( proc, job_wrapper, job_id )
Expand All @@ -130,17 +121,16 @@ def fail(message):
log.debug('execution finished: %s' % command_line)
except Exception:
log.exception("failure running job %d" % job_wrapper.job_id)
fail("failure running job")
self._fail_job_local(job_wrapper, "failure running job")
return

if not include_metadata:
self._handle_metadata_externally( job_wrapper, resolve_requirements=True )
self._handle_metadata_if_needed(job_wrapper)
# Finish the job!
try:
job_wrapper.finish( stdout, stderr, exit_code )
except:
log.exception("Job wrapper finish method failed")
fail("Unable to finish job")
self._fail_job_local(job_wrapper, "Unable to finish job")

def stop_job( self, job ):
# if our local job has JobExternalOutputMetadata associated, then our primary job has to have already finished
Expand Down Expand Up @@ -175,6 +165,25 @@ def recover( self, job, job_wrapper ):
# local jobs can't be recovered
job_wrapper.change_state( model.Job.states.ERROR, info="This job was killed when Galaxy was restarted. Please retry the job." )

def _fail_job_local( self, job_wrapper, message ):
job_destination = job_wrapper.job_destination
job_state = JobState(job_wrapper, job_destination)
job_state.fail_message = message
job_state.stop_job = False
self.fail_job(job_state, exception=True)

def _handle_metadata_if_needed(self, job_wrapper):
if not self._embed_metadata(job_wrapper):
self._handle_metadata_externally(job_wrapper, resolve_requirements=True)

def _embed_metadata(self, job_wrapper):
job_destination = job_wrapper.job_destination
embed_metadata = asbool(job_destination.params.get("embed_metadata_in_job", DEFAULT_EMBED_METADATA_IN_JOB))
return embed_metadata

def _prepare_job_local(self, job_wrapper):
return self.prepare_job(job_wrapper, include_metadata=self._embed_metadata(job_wrapper))

def _check_pid( self, pid ):
try:
os.kill( pid, 0 )
Expand Down
27 changes: 26 additions & 1 deletion lib/galaxy/jobs/runners/state_handlers/resubmit.py
@@ -1,10 +1,14 @@
import logging

from datetime import datetime

__all__ = ('failure', )

log = logging.getLogger(__name__)

from galaxy import model
from galaxy.jobs.runners import JobState

from ._safe_eval import safe_eval


Expand All @@ -23,13 +27,34 @@ def eval_condition(condition, job_state):
# not set or not a handleable runner state
return False

attempt = 1
now = datetime.utcnow()
last_running_state = None
last_queued_state = None
for state in job_state.job_wrapper.get_job().state_history:
if state.state == model.Job.states.RUNNING:
last_running_state = state
elif state.state == model.Job.states.QUEUED:
last_queued_state = state
elif state.state == model.Job.states.RESUBMITTED:
attempt = attempt + 1

seconds_running = 0
seconds_since_queued = 0
if last_running_state:
seconds_running = (now - last_running_state.create_time).total_seconds()
if last_queued_state:
seconds_since_queued = (now - last_queued_state.create_time).total_seconds()

condition_locals = {
"walltime_reached": runner_state == JobState.runner_states.WALLTIME_REACHED,
"memory_limit_reached": runner_state == JobState.runner_states.MEMORY_LIMIT_REACHED,
"unknown_error": JobState.runner_states.UNKNOWN_ERROR,
"any_failure": True,
"any_potential_job_failure": True, # Add a hook here - later on allow tools to describe things that are definitely input problems.
"attempt": job_state.job_wrapper.get_job().attempt,
"attempt": attempt,
"seconds_running": seconds_running,
"seconds_since_queued": seconds_since_queued,
}

# Small optimization to eliminate the need to parse AST and eval for simple variables.
Expand Down
9 changes: 0 additions & 9 deletions lib/galaxy/model/__init__.py
Expand Up @@ -618,15 +618,6 @@ def set_state( self, state ):
self.state = state
self.state_history.append( JobStateHistory( self ) )

@property
def attempt( self ):
attempt_count = 1
for state in self.state_history:
if state.state == Job.states.RESUBMITTED:
attempt_count = attempt_count + 1

return attempt_count

def get_param_values( self, app, ignore_errors=False ):
"""
Read encoded parameter values from the database and turn back into a
Expand Down
9 changes: 7 additions & 2 deletions test/integration/resubmission_job_conf.xml
Expand Up @@ -40,11 +40,16 @@

<!-- This will fail twice and succeed on walltime reached and will fail twice and fail hard else. -->
<destination id="fail_two_attempts" runner="failure_runner">
<param id="test_param">test_value</param>
<resubmit condition="attempt &lt; 3" />
<resubmit condition="attempt == 3 and walltime_reached" destination="retry_test_generic" />
</destination>

<!-- Resubmit only jobs shorter than 5 seconds. -->
<destination id="resubmit_if_short" runner="failure_runner">
<resubmit condition="seconds_running &lt; 5" destination="retry_test_generic" />
<resubmit condition="any_failure" destination="fails_without_resubmission" />
</destination>

<destination id="fails_without_resubmission" runner="failure_runner">
</destination>

Expand Down Expand Up @@ -73,7 +78,7 @@

<resources default="test">
<group id="upload"></group>
<group id="test">test_name,failure_state,initial_destination</group>
<group id="test">test_name,failure_state,initial_destination,run_for</group>
</resources>

<tools>
Expand Down
Expand Up @@ -2,4 +2,5 @@
<param label="Test Name" name="test_name" type="text" value="" help="Test name to connect client with simulated backend failures." />
<param label="Failure Type" name="failure_state" type="text" value="" help="If set, FailsJobRunner fails with this runner state." />
<param label="Initial Destination" name="initial_destination" type="text" value="" help="If set, initial destination to route job to." />
</parameters>
<param label="Run For" name="run_for" type="integer" value="0" help="If greater than 0, wait this long before triggering runtime failure state (memory or walltime)." />
</parameters>
12 changes: 12 additions & 0 deletions test/integration/resubmission_runners.py
@@ -1,3 +1,6 @@
import time

from galaxy import model
from galaxy.jobs.runners import (
JobState
)
Expand All @@ -8,9 +11,18 @@ class FailsJobRunner(LocalJobRunner):
"""Job runner that fails with runner state specified via job resource parameters."""

def queue_job(self, job_wrapper):
if not self._prepare_job_local(job_wrapper):
return

resource_parameters = job_wrapper.get_resource_parameters()
failure_state = resource_parameters.get("failure_state", None)

if failure_state in (JobState.runner_states.WALLTIME_REACHED, JobState.runner_states.MEMORY_LIMIT_REACHED):
job_wrapper.change_state(model.Job.states.RUNNING)
run_for = int(resource_parameters.get("run_for", 0))
if run_for > 0:
time.sleep(run_for)

job_state = JobState(
job_wrapper,
job_wrapper.job_destination
Expand Down
10 changes: 10 additions & 0 deletions test/integration/test_job_resubmission.py
Expand Up @@ -65,6 +65,16 @@ def test_condition_attempt(self):
"initial_destination": "fail_two_attempts",
"failure_state": "walltime_reached"})

def test_condition_seconds_running(self):
self._assert_job_passes(resource_parameters={"test_name": "test_condition_seconds_running",
"initial_destination": "resubmit_if_short",
"failure_state": "walltime_reached",
"run_for": "1"})
self._assert_job_fails(resource_parameters={"test_name": "test_condition_seconds_running",
"initial_destination": "resubmit_if_short",
"failure_state": "walltime_reached",
"run_for": "15"})

def _assert_job_passes(self, resource_parameters):
self._run_tool_test("simple_constructs", resource_parameters=resource_parameters)

Expand Down

0 comments on commit 18eb1c8

Please sign in to comment.