Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added resubmit handlers on Pulsar connection failures #7053

merged 6 commits into from Dec 21, 2018
@@ -55,7 +55,7 @@ def __init__(self, job_wrapper, url_to_destination, job_config):
module_name = job_config.dynamic_params['rules_module']
self.rules_module = importlib.import_module(module_name)

def __invoke_expand_function(self, expand_function, destination_params):
def __invoke_expand_function(self, expand_function, destination):
function_arg_names = inspect.getargspec(expand_function).args
app =
possible_args = {
@@ -64,15 +64,16 @@ def __invoke_expand_function(self, expand_function, destination_params):
"job_wrapper": self.job_wrapper,
"rule_helper": RuleHelper(app),
"app": app
"app": app,
"referrer": destination

actual_args = {}

# Send through any job_conf.xml defined args to function
for destination_param in destination_params.keys():
for destination_param in destination.params.keys():
if destination_param in function_arg_names:
actual_args[destination_param] = destination_params[destination_param]
actual_args[destination_param] = destination.params[destination_param]

# Populate needed args
for possible_arg_name in possible_args:
@@ -196,7 +197,7 @@ def __handle_dynamic_job_destination(self, destination):
return self.__handle_rule(expand_function, destination)

def __handle_rule(self, rule_function, destination):
job_destination = self.__invoke_expand_function(rule_function, destination.params)
job_destination = self.__invoke_expand_function(rule_function, destination)
if not isinstance(job_destination,
job_destination_rep = str(job_destination) # Should be either id or url
if '://' in job_destination_rep:
@@ -328,6 +328,8 @@ def __prepare_job(self, job_wrapper, job_destination):
client = None
remote_job_config = None
compute_environment = None

fail_or_resubmit = False
client = self.get_client_from_wrapper(job_wrapper)
tool = job_wrapper.tool
@@ -377,16 +379,21 @@ def __prepare_job(self, job_wrapper, job_destination):
except UnsupportedPulsarException as e:, exception=False)
log.exception("failure running job %d", job_wrapper.job_id)
except UnsupportedPulsarException:
log.exception("failure running job %d, unsupported Pulsar target", job_wrapper.job_id)
fail_or_resubmit = True
except PulsarClientTransportError:
log.exception("failure running job %d, Pulsar connection failed", job_wrapper.job_id)
fail_or_resubmit = True
except Exception:"failure preparing job", exception=True)
log.exception("failure running job %d", job_wrapper.job_id)
fail_or_resubmit = True

# If we were able to get a command line, run the job
if not command_line:
job_wrapper.finish('', '')
# If we were unable to get a command line, there was problem
fail_or_resubmit = fail_or_resubmit or not command_line
if fail_or_resubmit:
job_state = self._job_state(job_wrapper.get_job(), job_wrapper)
self.work_queue.put((self.fail_job, job_state))

return command_line, client, remote_job_config, compute_environment

@@ -436,9 +443,11 @@ def get_client_from_wrapper(self, job_wrapper):
if hasattr(job_wrapper, 'task_id'):
job_id = "%s_%s" % (job_id, job_wrapper.task_id)
params = job_wrapper.job_destination.params.copy()
for key, value in params.items():
if value:
params[key] = model.User.expand_user_properties(job_wrapper.get_job().user, value)
user = job_wrapper.get_job().user
if user:
for key, value in params.items():
if value:
params[key] = model.User.expand_user_properties(user, value)

env = getattr(job_wrapper.job_destination, "env", [])
return self.get_client(params, job_id, env)
@@ -497,8 +506,7 @@ def finish_job(self, job_state):
if failed:"Failed to find or download one or more job outputs from remote server.", exception=True)
except Exception:
message = GENERIC_REMOTE_ERROR, exception=True)
self.fail_job(job_state, message=GENERIC_REMOTE_ERROR, exception=True)
log.exception("failure finishing job %d", job_wrapper.job_id)
if not PulsarJobRunner.__remote_metadata(client):
@@ -515,15 +523,18 @@ def finish_job(self, job_state):
log.exception("Job wrapper finish method failed")"Unable to finish job", exception=True)

def fail_job(self, job_state, message=GENERIC_REMOTE_ERROR, full_status=None):
def fail_job(self, job_state, message=GENERIC_REMOTE_ERROR, full_status=None, exception=False):
"""Seperated out so we can use the worker threads for it."""
stdout = ""
stderr = ""
if full_status:
stdout = full_status.get("stdout", "")
stderr = full_status.get("stderr", ""), "fail_message", message), stdout=stdout, stderr=stderr)
self._handle_runner_state('failure', job_state)
if not job_state.runner_state_handled:, "fail_message", message),
stdout=stdout, stderr=stderr, exception=exception)

def check_pid(self, pid):
@@ -538,6 +549,8 @@ def check_pid(self, pid):

def stop_job(self, job_wrapper):
job = job_wrapper.get_job()
if not job.job_runner_external_id:
# if our local job has JobExternalOutputMetadata associated, then our primary job has to have already finished
client = self.get_client(job.destination_params, job.job_runner_external_id)
job_ext_output_metadata = job.get_external_output_metadata()
@@ -0,0 +1,31 @@
<?xml version="1.0"?>
Slimmed down resubmission_job_conf.xml for testing default resubmission rules.
<plugin id="local" type="runner" load="" workers="2"/>
<plugin id="pulsar_rest" type="runner" load=""/>

<handler id="main"/>

<destinations default="initial_pulsar">
<destination id="initial_pulsar" runner="pulsar_rest">
<!-- Use a reserved, unused ip to force a resubmission -->
<param id="url"></param>
<param id="private_token">some_token</param>
<resubmit condition="unknown_error" destination="local" />

<!-- Upload destination. -->
<destination id="local" runner="local">

<tool id="upload1" destination="local" resources="upload" />
@@ -13,6 +13,7 @@
JOB_RESUBMISSION_TOOL_DETECTED_ALWAYS_ERROR_JOB_CONFIG_FILE = os.path.join(SCRIPT_DIRECTORY, "resubmission_tool_detected_always_error_job_conf.xml")
JOB_RESUBMISSION_TOOL_DETECTED_RESUBMIT_JOB_CONFIG_FILE = os.path.join(SCRIPT_DIRECTORY, "resubmission_tool_detected_resubmit_job_conf.xml")
JOB_RESUBMISSION_JOB_RESOURCES_CONFIG_FILE = os.path.join(SCRIPT_DIRECTORY, "resubmission_job_resource_parameters_conf.xml")
JOB_RESUBMISSION_PULSAR_JOB_CONFIG_FILE = os.path.join(SCRIPT_DIRECTORY, "resubmission_pulsar_job_conf.xml")

class _BaseResubmissionIntegerationTestCase(integration_util.IntegrationTestCase):
@@ -178,3 +179,14 @@ def handle_galaxy_config_kwds(cls, config):

def test_dynamic_resubmission(self):

# Verify that a failure to connect to pulsar can trigger a resubmit
class JobResubmissionPulsarIntegrationTestCase(_BaseResubmissionIntegerationTestCase):

def handle_galaxy_config_kwds(cls, config):

def test_resubmit_on_invalid_pulsar_url(self):
@@ -63,7 +63,7 @@ def test_dynamic_mapping_job_conf_params():

def test_dynamic_mapping_function_parameters():
mapper = __mapper(__dynamic_destination(dict(function="check_rule_params")))
mapper = __mapper(__dynamic_destination(dict(function="check_rule_params", param1="referrer_param")))
assert mapper.get_job_destination({}) is DYNAMICALLY_GENERATED_DESTINATION
assert mapper.job_config.rule_response == "all_passed"

@@ -36,6 +36,7 @@ def check_rule_params(
@@ -47,6 +48,8 @@ def check_rule_params(
assert job_wrapper.is_mock_job_wrapper()
assert app ==
assert rule_helper is not None
assert isinstance(referrer, JobDestination)
assert referrer.params['param1'] == "referrer_param"

assert job.user == user
assert == 6789
ProTip! Use n and p to navigate between commits in a pull request.
You can’t perform that action at this time.