diff --git a/lib/galaxy/dependencies/pinned-requirements.txt b/lib/galaxy/dependencies/pinned-requirements.txt index e7cf1f026067..52a22ed66ca4 100644 --- a/lib/galaxy/dependencies/pinned-requirements.txt +++ b/lib/galaxy/dependencies/pinned-requirements.txt @@ -47,7 +47,7 @@ anyjson==0.3.3 # Pulsar requirements psutil==4.1.0 -pulsar-galaxy-lib==0.7.0.dev4 +pulsar-galaxy-lib==0.7.0.dev5 # sqlalchemy-migrate and dependencies sqlalchemy-migrate==0.10.0 diff --git a/lib/galaxy/jobs/runners/pulsar.py b/lib/galaxy/jobs/runners/pulsar.py index d2f1b6d95f7b..dad1b19feabf 100644 --- a/lib/galaxy/jobs/runners/pulsar.py +++ b/lib/galaxy/jobs/runners/pulsar.py @@ -18,6 +18,7 @@ from pulsar.client import PulsarOutputs from pulsar.client import ClientOutputs from pulsar.client import PathMapper +from pulsar.client import PulsarClientTransportError import pulsar.core @@ -63,6 +64,10 @@ valid=specs.is_in("urllib", "curl", None), default=None ), + transport_timeout=dict( + map=lambda val: None if val == "None" else int(val), + default=None, + ), cache=dict( map=specs.to_bool_or_none, default=None, @@ -191,7 +196,7 @@ def __init_client_manager( self ): client_manager_kwargs[ "file_cache" ] = None for kwd in self.runner_params.keys(): - if kwd.startswith( 'amqp_' ): + if kwd.startswith( 'amqp_' ) or kwd.startswith( 'transport_' ): client_manager_kwargs[ kwd ] = self.runner_params[ kwd ] self.client_manager = build_client_manager(**client_manager_kwargs) @@ -224,6 +229,9 @@ def check_watched_item(self, job_state): try: client = self.get_client_from_state(job_state) status = client.get_status() + except PulsarClientTransportError as exc: + log.error("Communication error with Pulsar server on state check, will retry: %s", exc) + return job_state except Exception: # An orphaned job was put into the queue at app startup, so remote server went down # either way we are done I guess.