Skip to content

Commit

Permalink
Merge pull request #704 from natefoo/amqp_ack
Browse files Browse the repository at this point in the history
[15.07] With Pulsar, ensure AMQP consumers receive published messages
  • Loading branch information
jmchilton committed Sep 10, 2015
2 parents 1203ccc + 8778da4 commit ff8e821
Show file tree
Hide file tree
Showing 15 changed files with 405 additions and 59 deletions.
13 changes: 13 additions & 0 deletions config/job_conf.xml.sample_advanced
Expand Up @@ -54,6 +54,19 @@
galaxy_infrastructure_url is set in galaxy.ini.
-->
<param id="galaxy_url">http://localhost:8080</param>
<!-- AMQP does not guarantee that a published message is received by
the AMQP server, so Galaxy/Pulsar can request that the consumer
acknowledge messages and will resend them if acknowledgement is
not received after a configurable timeout. -->
<!-- <param id="amqp_acknowledge">False</param> -->
<!-- Galaxy reuses Pulsar's persistence_directory parameter (via the
Pulsar client lib) to store a record of received
acknowledgements, and to keep track of messages which have not
been acknowledged. -->
<!-- <param id="persistence_directory">/path/to/dir</param> -->
<!-- Number of seconds to wait for an acknowledgement before
republishing a message. -->
<!-- <param id="amqp_republish_time">30</param> -->
<!-- Pulsar job manager to communicate with (see Pulsar
docs for information on job managers). -->
<!-- <param id="manager">_default_</param> -->
Expand Down
11 changes: 10 additions & 1 deletion lib/galaxy/jobs/runners/pulsar.py
Expand Up @@ -61,6 +61,14 @@
map=specs.to_str_or_none,
default=None,
),
persistence_directory=dict(
map=specs.to_str_or_none,
default=None,
),
amqp_acknowledge=dict(
map=specs.to_bool_or_none,
default=None
),
amqp_consumer_timeout=dict(
map=lambda val: None if val == "None" else float(val),
default=None,
Expand Down Expand Up @@ -145,7 +153,7 @@ def _monitor( self ):

def __init_client_manager( self ):
client_manager_kwargs = {}
for kwd in 'manager', 'cache', 'transport':
for kwd in 'manager', 'cache', 'transport', 'persistence_directory':
client_manager_kwargs[ kwd ] = self.runner_params[ kwd ]
for kwd in self.runner_params.keys():
if kwd.startswith( 'amqp_' ):
Expand Down Expand Up @@ -611,6 +619,7 @@ def _monitor( self ):
# This is a message queue driven runner, don't monitor
# just setup required callback.
self.client_manager.ensure_has_status_update_callback(self.__async_update)
self.client_manager.ensure_has_ack_consumers()

def __async_update( self, full_status ):
job_id = None
Expand Down
22 changes: 17 additions & 5 deletions lib/pulsar/client/action_mapper.py
Expand Up @@ -83,7 +83,9 @@ class FileActionMapper(object):
... f.close()
... mock_client = Bunch(default_file_action=default_action, action_config_path=f.name, files_endpoint=None)
... mapper = FileActionMapper(mock_client)
... mapper = FileActionMapper(config=mapper.to_dict()) # Serialize and deserialize it to make sure still works
... as_dict = config=mapper.to_dict()
... # print(as_dict["paths"])
... mapper = FileActionMapper(config=as_dict) # Serialize and deserialize it to make sure still works
... unlink(f.name)
... return mapper
>>> mapper = mapper_for(default_action='none', config_contents=json_string)
Expand Down Expand Up @@ -171,7 +173,7 @@ def to_dict(self):
ssh_user=self.ssh_user,
ssh_port=self.ssh_port,
ssh_host=self.ssh_host,
paths=map(lambda m: m.to_dict(), self.mappers)
paths=list(map(lambda m: m.to_dict(), self.mappers))
)

def __client_to_config(self, client):
Expand Down Expand Up @@ -261,7 +263,7 @@ def unstructured_map(self, path_helper):
if self.staging_needed:
# To ensure uniqueness, prepend unique prefix to each name
prefix = unique_path_prefix(self.path)
for path, name in unstructured_map.iteritems():
for path, name in unstructured_map.items():
unstructured_map[path] = join(prefix, name)
else:
path_rewrites = {}
Expand All @@ -280,6 +282,16 @@ def staging_needed(self):
def staging_action_local(self):
return self.staging == STAGING_ACTION_LOCAL

def to_dict(self):
return dict(action_type=self.action_type)

def __str__(self):
as_dict = self.to_dict()
attribute_str = ""
for key, value in as_dict.items():
attribute_str += "%s=%s" % (key, value)
return "FileAction[%s]" % attribute_str


class NoneAction(BaseAction):
""" This action indicates the corresponding path does not require any
Expand Down Expand Up @@ -446,7 +458,7 @@ def _serialized_key(self):
def __serialize_ssh_key(self):
f = tempfile.NamedTemporaryFile(delete=False)
if self.ssh_key is not None:
f.write(self.ssh_key)
f.write(self.ssh_key.encode("utf-8"))
else:
raise Exception("SSH_KEY not available")
return f.name
Expand Down Expand Up @@ -644,7 +656,7 @@ def to_dict(self):


def mappers_from_dicts(mapper_def_list):
return map(lambda m: _mappper_from_dict(m), mapper_def_list)
return list(map(lambda m: _mappper_from_dict(m), mapper_def_list))


def _mappper_from_dict(mapper_dict):
Expand Down
103 changes: 101 additions & 2 deletions lib/pulsar/client/amqp_exchange.py
Expand Up @@ -3,7 +3,7 @@
import socket
import logging
import threading
from time import sleep
from time import sleep, time

try:
import kombu
Expand All @@ -26,6 +26,14 @@
DEFAULT_HEARTBEAT_WAIT = 1
DEFAULT_HEARTBEAT_JOIN_TIMEOUT = 10

ACK_QUEUE_SUFFIX = "_ack"
ACK_UUID_KEY = 'acknowledge_uuid'
ACK_QUEUE_KEY = 'acknowledge_queue'
ACK_UUID_RESPONSE_KEY = 'acknowledge_uuid_response'
ACK_FORCE_NOACK_KEY = 'force_noack'
DEFAULT_ACK_MANAGER_SLEEP = 15
DEFAULT_REPUBLISH_TIME = 30


class PulsarExchange(object):
""" Utility for publishing and consuming structured Pulsar queues using kombu.
Expand All @@ -47,6 +55,9 @@ def __init__(
connect_ssl=None,
timeout=DEFAULT_TIMEOUT,
publish_kwds={},
publish_uuid_store=None,
consume_uuid_store=None,
republish_time=DEFAULT_REPUBLISH_TIME,
):
"""
"""
Expand All @@ -57,27 +68,43 @@ def __init__(
self.__connect_ssl = connect_ssl
self.__exchange = kombu.Exchange(DEFAULT_EXCHANGE_NAME, DEFAULT_EXCHANGE_TYPE)
self.__timeout = timeout
self.__republish_time = republish_time
# Be sure to log message publishing failures.
if publish_kwds.get("retry", False):
if "retry_policy" not in publish_kwds:
publish_kwds["retry_policy"] = {}
if "errback" not in publish_kwds["retry_policy"]:
publish_kwds["retry_policy"]["errback"] = self.__publish_errback
self.__publish_kwds = publish_kwds
self.publish_uuid_store = publish_uuid_store
self.consume_uuid_store = consume_uuid_store
self.publish_ack_lock = threading.Lock()

@property
def url(self):
return self.__url

@property
def acks_enabled(self):
return self.publish_uuid_store is not None

def consume(self, queue_name, callback, check=True, connection_kwargs={}):
queue = self.__queue(queue_name)
log.debug("Consuming queue '%s'", queue)
callbacks = [self.__ack_callback]
if callback is not None:
callbacks.append(callback)
while check:
heartbeat_thread = None
try:
with self.connection(self.__url, heartbeat=DEFAULT_HEARTBEAT, **connection_kwargs) as connection:
with kombu.Consumer(connection, queues=[queue], callbacks=[callback], accept=['json']):
with kombu.Consumer(connection, queues=[queue], callbacks=callbacks, accept=['json']):
heartbeat_thread = self.__start_heartbeat(queue_name, connection)
# Ack manager should sleep before checking for
# repbulishes, but if that changes, need to drain the
# queue once before the ack manager starts doing its
# thing
self.__start_ack_manager(queue_name)
while check and connection.connected:
try:
connection.drain_events(timeout=self.__timeout)
Expand All @@ -90,6 +117,41 @@ def consume(self, queue_name, callback, check=True, connection_kwargs={}):
raise
log.info("Done consuming queue %s" % queue_name)

def __ack_callback(self, body, message):
if ACK_UUID_KEY in body:
# The consumer of a normal queue has received a message requiring
# acknowledgement
ack_uuid = body[ACK_UUID_KEY]
ack_queue = body[ACK_QUEUE_KEY]
response = {ACK_UUID_RESPONSE_KEY: ack_uuid}
log.debug('Acknowledging UUID %s on queue %s', ack_uuid, ack_queue)
self.publish(ack_queue, response)
if self.consume_uuid_store is None:
log.warning('Received an ack request (UUID: %s, response queue: '
'%s) but ack UUID persistence is not enabled, check '
'your config', ack_uuid, ack_queue)
elif ack_uuid not in self.consume_uuid_store:
# This message has not been seen before, store the uuid so it
# is not operated on more than once
self.consume_uuid_store[ack_uuid] = time()
else:
# This message has been seen before, prevent downstream
# callbacks from processing normally by acknowledging it here,
# still send the ack reply
log.warning('Message with UUID %s on queue %s has already '
'been performed, skipping callback', ack_uuid, ack_queue)
message.ack()
elif ACK_UUID_RESPONSE_KEY in body:
# The consumer of an ack queue has received an ack, remove it from the store
ack_uuid = body[ACK_UUID_RESPONSE_KEY]
log.debug('Got acknowledgement for UUID %s, will remove from store', ack_uuid)
try:
with self.publish_ack_lock:
del self.publish_uuid_store[ack_uuid]
except KeyError:
log.warning('Cannot remove UUID %s from store, already removed', ack_uuid)
message.ack()

def __handle_io_error(self, exc, heartbeat_thread):
# In testing, errno is None
log.warning('Got %s, will retry: %s', exc.__class__.__name__, exc)
Expand Down Expand Up @@ -120,6 +182,16 @@ def publish(self, name, payload):
key = self.__queue_name(name)
publish_log_prefix = self.__publish_log_prefex(transaction_uuid)
log.debug("%sBegin publishing to key %s", publish_log_prefix, key)
if (self.acks_enabled and not name.endswith(ACK_QUEUE_SUFFIX)
and ACK_FORCE_NOACK_KEY not in payload):
# Publishing a message on a normal queue and it's not a republish
# (or explicitly forced do-not-ack), so add ack keys
ack_uuid = str(transaction_uuid)
ack_queue = name + ACK_QUEUE_SUFFIX
payload[ACK_UUID_KEY] = ack_uuid
payload[ACK_QUEUE_KEY] = ack_queue
self.publish_uuid_store[ack_uuid] = payload
log.debug('Requesting acknowledgement of UUID %s on queue %s', ack_uuid, ack_queue)
with self.connection(self.__url) as connection:
with pools.producers[connection].acquire() as producer:
log.debug("%sHave producer for publishing to key %s", publish_log_prefix, key)
Expand All @@ -134,6 +206,25 @@ def publish(self, name, payload):
)
log.debug("%sPublished to key %s", publish_log_prefix, key)

def ack_manager(self, queue_name):
log.debug('Acknowledgement manager thread alive')
resubmit_queue = queue_name[:-len(ACK_QUEUE_SUFFIX)]
try:
while True:
sleep(DEFAULT_ACK_MANAGER_SLEEP)
with self.publish_ack_lock:
for unack_uuid in self.publish_uuid_store.keys():
if self.publish_uuid_store.get_time(unack_uuid) < time() - self.__republish_time:
log.debug('UUID %s has not been acknowledged, republishing original message', unack_uuid)
payload = self.publish_uuid_store[unack_uuid]
payload[ACK_FORCE_NOACK_KEY] = True
self.publish(resubmit_queue, payload)
self.publish_uuid_store.set_time(unack_uuid)
except:
log.exception("Problem with acknowledgement manager, leaving ack_manager method in problematic state!")
raise
log.debug('Acknowledgedment manager thread exiting')

def __prepare_publish_kwds(self, publish_log_prefix):
if "retry_policy" in self.__publish_kwds:
publish_kwds = copy.deepcopy(self.__publish_kwds)
Expand Down Expand Up @@ -182,3 +273,11 @@ def __start_heartbeat(self, queue_name, connection):
thread = threading.Thread(name=thread_name, target=self.heartbeat, args=(connection,))
thread.start()
return thread

def __start_ack_manager(self, queue_name):
if self.acks_enabled and queue_name.endswith(ACK_QUEUE_SUFFIX):
thread_name = "acknowledgement-manager-%s" % (self.__queue_name(queue_name))
thread = threading.Thread(name=thread_name, target=self.ack_manager, args=(queue_name,))
thread.daemon = True
thread.start()
return thread
17 changes: 16 additions & 1 deletion lib/pulsar/client/amqp_exchange_factory.py
@@ -1,5 +1,5 @@
from .amqp_exchange import PulsarExchange
from .util import filter_destination_params
from .util import filter_destination_params, MessageQueueUUIDStore


def get_exchange(url, manager_name, params):
Expand All @@ -9,6 +9,8 @@ def get_exchange(url, manager_name, params):
connect_ssl=connect_ssl,
publish_kwds=parse_amqp_publish_kwds(params)
)
if params.get('amqp_acknowledge', False):
exchange_kwds.update(parse_ack_kwds(params, manager_name))
timeout = params.get('amqp_consumer_timeout', False)
if timeout is not False:
exchange_kwds['timeout'] = timeout
Expand Down Expand Up @@ -39,3 +41,16 @@ def parse_amqp_publish_kwds(params):
if retry_policy_params:
all_publish_params["retry_policy"] = retry_policy_params
return all_publish_params


def parse_ack_kwds(params, manager_name):
ack_params = {}
persistence_directory = params.get('persistence_directory', None)
if persistence_directory:
subdirs = ['amqp_ack-%s' % manager_name]
ack_params['publish_uuid_store'] = MessageQueueUUIDStore(persistence_directory, subdirs=subdirs + ['publish'])
ack_params['consume_uuid_store'] = MessageQueueUUIDStore(persistence_directory, subdirs=subdirs + ['consume'])
republish_time = params.get('amqp_ack_republish_time', None)
if republish_time:
ack_params['republish_time'] = int(republish_time)
return ack_params
20 changes: 12 additions & 8 deletions lib/pulsar/client/client.py
@@ -1,12 +1,14 @@
import os
from json import dumps
from json import loads

from six import string_types

from .destination import submit_params
from .setup_handler import build as build_setup_handler
from .job_directory import RemoteJobDirectory
from .decorators import parseJson
from .decorators import retry
from .util import json_dumps
from .util import json_loads
from .util import copy
from .util import ensure_directory
from .util import to_base64_json
Expand Down Expand Up @@ -109,19 +111,19 @@ def launch(self, command_line, dependencies_description=None, env=[], remote_sta
launch_params = dict(command_line=command_line, job_id=self.job_id)
submit_params_dict = submit_params(self.destination_params)
if submit_params_dict:
launch_params['params'] = dumps(submit_params_dict)
launch_params['params'] = json_dumps(submit_params_dict)
if dependencies_description:
launch_params['dependencies_description'] = dumps(dependencies_description.to_dict())
launch_params['dependencies_description'] = json_dumps(dependencies_description.to_dict())
if env:
launch_params['env'] = dumps(env)
launch_params['env'] = json_dumps(env)
if remote_staging:
launch_params['remote_staging'] = dumps(remote_staging)
launch_params['remote_staging'] = json_dumps(remote_staging)
if job_config and self.setup_handler.local:
# Setup not yet called, job properties were inferred from
# destination arguments. Hence, must have Pulsar setup job
# before queueing.
setup_params = _setup_params_from_job_config(job_config)
launch_params["setup_params"] = dumps(setup_params)
launch_params["setup_params"] = json_dumps(setup_params)
return self._raw_execute("submit", launch_params)

def full_status(self):
Expand Down Expand Up @@ -174,10 +176,12 @@ def put_file(self, path, input_type, name=None, contents=None, action_type='tran
# action type == 'message' should either copy or transfer
# depending on default not just fallback to transfer.
if action_type in ['transfer', 'message']:
if isinstance(contents, string_types):
contents = contents.encode("utf-8")
return self._upload_file(args, contents, input_path)
elif action_type == 'copy':
path_response = self._raw_execute('path', args)
pulsar_path = loads(path_response)['path']
pulsar_path = json_loads(path_response)['path']
copy(path, pulsar_path)
return {'path': pulsar_path}

Expand Down

0 comments on commit ff8e821

Please sign in to comment.