Skip to content

Commit

Permalink
Merge pull request #228 from jmchilton/amqp-status-poll
Browse files Browse the repository at this point in the history
Respond to MQ messages requesting status updates.
  • Loading branch information
jmchilton committed Jun 24, 2020
2 parents 7a1fdbe + ca0e67f commit c086312
Show file tree
Hide file tree
Showing 6 changed files with 137 additions and 31 deletions.
18 changes: 17 additions & 1 deletion pulsar/client/client.py
Expand Up @@ -19,6 +19,7 @@
actions,
path_type,
)
from .amqp_exchange import ACK_FORCE_NOACK_KEY
from .decorators import parseJson
from .decorators import retry
from .destination import submit_params
Expand Down Expand Up @@ -328,6 +329,15 @@ def _build_setup_message(self, command_line, dependencies_description, env, remo
launch_params["setup_params"] = setup_params
return launch_params

def _build_status_request_message(self):
# Because this is used to poll, status requests will not be resent if we do not receive an acknowledgement
update_params = {
'request': 'status',
'job_id': self.job_id,
ACK_FORCE_NOACK_KEY: True,
}
return update_params


class MessageJobClient(BaseMessageJobClient):

Expand All @@ -342,7 +352,13 @@ def launch(self, command_line, dependencies_description=None, env=[], remote_sta
job_config=job_config,
)
response = self.client_manager.exchange.publish("setup", launch_params)
log.info("Job published to setup message queue.")
log.info("Job published to setup message queue: %s", self.job_id)
return response

def get_status(self):
status_params = self._build_status_request_message()
response = self.client_manager.exchange.publish("setup", status_params)
log.info("Job status request published to setup message queue: %s", self.job_id)
return response

def kill(self):
Expand Down
2 changes: 1 addition & 1 deletion pulsar/managers/base/base_drmaa.py
Expand Up @@ -80,7 +80,7 @@ def _build_template_attributes(self, job_id, command_line, dependencies_descript
attributes["nativeSpecification"] = native_specification
log.info("Submitting DRMAA job with nativeSpecification [%s]" % native_specification)
else:
log.debug("Not native specification supplied, DRMAA job will be submitted with default parameters.")
log.debug("No native specification supplied, DRMAA job will be submitted with default parameters.")
return attributes


Expand Down
3 changes: 2 additions & 1 deletion pulsar/managers/base/external.py
Expand Up @@ -40,7 +40,8 @@ def get_status(self, job_id):
return status.CANCELLED
external_id = self._external_id(job_id)
if not external_id:
raise KeyError("Failed to find external id for job_id %s" % job_id)
log.warning("Failed to find external id for job_id %s", job_id)
return status.LOST
return self._get_status_external(external_id)

def _register_external_id(self, job_id, external_id):
Expand Down
8 changes: 8 additions & 0 deletions pulsar/managers/stateful.py
Expand Up @@ -63,6 +63,11 @@ def set_state_change_callback(self, state_change_callback):
def _default_status_change_callback(self, status, job_id):
log.info("Status of job [%s] changed to [%s]. No callbacks enabled." % (job_id, status))

def trigger_state_change_callback(self, job_id):
proxy_status = self.get_status(job_id)
log.debug("Triggering state change callback with status %s by request for job_id %s", proxy_status, job_id)
self.__state_change_callback(proxy_status, job_id)

@property
def name(self):
return self._proxied_manager.name
Expand Down Expand Up @@ -145,6 +150,9 @@ def get_status(self, job_id):
and track additional state information needed.
"""
job_directory = self._proxied_manager.job_directory(job_id)
if not job_directory.exists():
return status.LOST

with job_directory.lock("status"):
proxy_status, state_change = self.__proxy_status(job_directory, job_id)

Expand Down
66 changes: 38 additions & 28 deletions pulsar/messaging/bind_amqp.py
Expand Up @@ -43,6 +43,7 @@ def bind_manager_to_queue(manager, queue_state, connection_string, conf):

process_setup_messages = functools.partial(__process_setup_message, manager)
process_kill_messages = functools.partial(__process_kill_message, manager)
process_status_messages = functools.partial(__process_status_message, manager)

def drain(callback, name):
__drain(name, queue_state, pulsar_exchange, callback)
Expand All @@ -51,8 +52,9 @@ def drain(callback, name):
if conf.get("message_queue_consume", True):
setup_thread = start_setup_consumer(pulsar_exchange, functools.partial(drain, process_setup_messages, "setup"))
kill_thread = start_kill_consumer(pulsar_exchange, functools.partial(drain, process_kill_messages, "kill"))
status_thread = start_status_consumer(pulsar_exchange, functools.partial(drain, process_status_messages, "status"))
if hasattr(queue_state, "threads"):
queue_state.threads.extend([setup_thread, kill_thread])
queue_state.threads.extend([setup_thread, kill_thread, status_thread])
if conf.get("amqp_acknowledge", False):
status_update_ack_thread = start_status_update_ack_consumer(pulsar_exchange, functools.partial(drain, None, "status_update_ack"))
getattr(queue_state, 'threads', []).append(status_update_ack_thread)
Expand Down Expand Up @@ -87,40 +89,48 @@ def __start_consumer(name, exchange, target):

start_setup_consumer = functools.partial(__start_consumer, "setup")
start_kill_consumer = functools.partial(__start_consumer, "kill")
start_status_consumer = functools.partial(__start_consumer, "status")
start_status_update_ack_consumer = functools.partial(__start_consumer, "status_update_ack")


def __drain(name, queue_state, pulsar_exchange, callback):
pulsar_exchange.consume(name, callback=callback, check=queue_state)


def __process_kill_message(manager, body, message):
if message.acknowledged:
log.info("Message is already acknowledged (by an upstream callback?), Pulsar will not handle this message")
return
try:
job_id = __client_job_id_from_body(body)
assert job_id, 'Could not parse job id from body: %s' % body
log.debug("Received message in kill queue for Pulsar job id: %s", job_id)
manager.kill(job_id)
except Exception:
log.exception("Failed to kill job.")
message.ack()


def __process_setup_message(manager, body, message):
if message.acknowledged:
log.info("Message is already acknowledged (by an upstream callback?), Pulsar will not handle this message")
return
try:
job_id = __client_job_id_from_body(body)
assert job_id, 'Could not parse job id from body: %s' % body
log.debug("Received message in setup queue for Pulsar job id: %s", job_id)
manager_endpoint_util.submit_job(manager, body)
except Exception:
job_id = job_id or 'unknown'
log.exception("Failed to setup job %s obtained via message queue." % job_id)
message.ack()
def __processes_message(f):

@functools.wraps(f)
def process_message(manager, body, message):
if message.acknowledged:
log.info("Message is already acknowledged (by an upstream callback?), Pulsar will not handle this message")
return

job_id = None
try:
job_id = __client_job_id_from_body(body)
assert job_id, 'Could not parse job id from body: %s' % body
f(manager, body, job_id)
except Exception:
job_id = job_id or 'unknown'
log.exception("Failed to process message with function %s for job_id %s" % (f.__name__, job_id))
message.ack()

return process_message


@__processes_message
def __process_kill_message(manager, body, job_id):
manager.kill(job_id)


@__processes_message
def __process_setup_message(manager, body, job_id):
manager_endpoint_util.submit_job(manager, body)


@__processes_message
def __process_status_message(manager, body, job_id):
manager.trigger_state_change_callback(job_id)


def __client_job_id_from_body(body):
Expand Down
71 changes: 71 additions & 0 deletions test/integration_test_state.py
Expand Up @@ -14,6 +14,7 @@
submit_job,
)
from pulsar.managers.stateful import ActiveJobs
from pulsar.client.amqp_exchange import ACK_FORCE_NOACK_KEY
from pulsar.client.amqp_exchange_factory import get_exchange
from pulsar.managers.util.drmaa import DrmaaSessionFactory

Expand Down Expand Up @@ -117,6 +118,66 @@ def test_staging_failure_fires_failed_status(self):
assert len(consumer.messages) == 1, len(consumer.messages)
assert consumer.messages[0]["status"] == "failed"

@skip_unless_module("kombu")
@integration_test
def test_async_request_of_mq_status(self):
test = "async_request_of_mq_status"
with self._setup_app_provider(test, manager_type="queued_python") as app_provider:
job_id = '12345'

consumer = self._status_update_consumer(test)
consumer.start()

with app_provider.new_app() as app:
manager = app.only_manager
job_info = {
'job_id': job_id,
'command_line': 'sleep 1000',
'setup': True,
# Invalid staging description...
'remote_staging': {"setup": [{"moo": "cow"}]}
}
# TODO: redo this with submit_job coming through MQ for test consistency.
submit_job(manager, job_info)
self._request_status(test, job_id)

import time
time.sleep(2)
consumer.wait_for_messages()
consumer.join()

messages = consumer.messages
assert len(messages) == 2, len(messages)
assert messages[0]["status"] == "failed"
assert messages[1]["status"] == "failed", messages[1]

@skip_unless_module("kombu")
@integration_test
def test_async_request_of_mq_status_lost(self):
test = "async_request_of_mq_status_lost"
with self._setup_app_provider(test, manager_type="queued_python") as app_provider:
job_id = '12347' # should be lost? - never existed right?

consumer = self._status_update_consumer(test)
consumer.start()

with app_provider.new_app() as app:
app.only_manager
# do two messages to ensure generation of status message doesn't
# create a job directory we don't mean to or something like that
self._request_status(test, job_id)
self._request_status(test, job_id)

import time
time.sleep(2)
consumer.wait_for_messages()
consumer.join()

messages = consumer.messages
assert len(messages) == 2, len(messages)
assert messages[0]["status"] == "lost", messages[0]
assert messages[1]["status"] == "lost", messages[1]

@skip_unless_module("kombu")
@integration_test
def test_setup_failure_fires_failed_status(self):
Expand Down Expand Up @@ -166,6 +227,16 @@ def _status_update_consumer(self, test):
consumer = SimpleConsumer(queue="status_update", url=mq_url, manager=manager)
return consumer

def _request_status(self, test, job_id):
mq_url = "memory://test_%s" % test
manager = "manager_%s" % test
exchange = get_exchange(mq_url, manager, {})
params = {
"job_id": job_id,
ACK_FORCE_NOACK_KEY: True,
}
exchange.publish("status", params)


class SimpleConsumer(object):

Expand Down

0 comments on commit c086312

Please sign in to comment.