Skip to content

Commit

Permalink
馃憣 IMPROVE: Catch state change broadcast timeout (#209)
Browse files Browse the repository at this point in the history
When using an RMQ communicator, the broadcast can timeout on heavy loads to RMQ
(for example see aiidateam/aiida-core#4745).
This broadcast is not critical to the running of the process,
and so a timeout should not except it.

Also ensure the process PID is included in all log messages.
  • Loading branch information
chrisjsewell committed Feb 24, 2021
1 parent b1bde82 commit 76bf8da
Showing 1 changed file with 14 additions and 9 deletions.
23 changes: 14 additions & 9 deletions plumpy/processes.py
Expand Up @@ -296,22 +296,22 @@ def init(self) -> None:
identifier = self._communicator.add_rpc_subscriber(self.message_receive, identifier=str(self.pid))
self.add_cleanup(functools.partial(self._communicator.remove_rpc_subscriber, identifier))
except kiwipy.TimeoutError:
self.logger.exception('Process<%s> failed to register as an RPC subscriber', self.pid)
self.logger.exception('Process<%s>: failed to register as an RPC subscriber', self.pid)

try:
identifier = self._communicator.add_broadcast_subscriber(
self.broadcast_receive, identifier=str(self.pid)
)
self.add_cleanup(functools.partial(self._communicator.remove_broadcast_subscriber, identifier))
except kiwipy.TimeoutError:
self.logger.exception('Process<%s> failed to register as a broadcast subscriber', self.pid)
self.logger.exception('Process<%s>: failed to register as a broadcast subscriber', self.pid)

if not self._future.done():

def try_killing(future: futures.Future) -> None:
if future.cancelled():
if not self.kill('Killed by future being cancelled'):
self.logger.warning('Failed to kill process on future cancel')
self.logger.warning('Process<%s>: Failed to kill process on future cancel', self.pid)

self._future.add_done_callback(try_killing)

Expand Down Expand Up @@ -690,12 +690,15 @@ def on_entered(self, from_state: Optional[process_states.State]) -> None:
if self._communicator and isinstance(self.state, enum.Enum):
from_label = cast(enum.Enum, from_state.LABEL).value if from_state is not None else None
subject = 'state_changed.{}.{}'.format(from_label, self.state.value)
self.logger.info('Broadcasting state change of %d: %s', self.pid, subject)
self.logger.info('Process<%s>: Broadcasting state change: %s', self.pid, subject)
try:
self._communicator.broadcast_send(body=None, sender=self.pid, subject=subject)
except ConnectionClosed:
message = 'no connection available to broadcast state change from %s to %s'
self.logger.info(message, from_label, self.state.value)
message = 'Process<%s>: no connection available to broadcast state change from %s to %s'
self.logger.warning(message, self.pid, from_label, self.state.value)
except kiwipy.TimeoutError:
message = 'Process<%s>: sending broadcast of state change from %s to %s timed out'
self.logger.warning(message, self.pid, from_label, self.state.value)

def on_exiting(self) -> None:
state = self.state
Expand Down Expand Up @@ -842,7 +845,7 @@ def on_close(self) -> None:
try:
cleanup()
except Exception: # pylint: disable=broad-except
self.logger.exception('Exception calling cleanup method %s', cleanup)
self.logger.exception('Process<%s>: Exception calling cleanup method %s', self.pid, cleanup)
self._cleanups = None
finally:
self._event_callbacks = {}
Expand All @@ -863,7 +866,7 @@ def message_receive(self, _comm: kiwipy.Communicator, msg: Dict[str, Any]) -> An
:param msg: the message
:return: the outcome of processing the message, the return value will be sent back as a response to the sender
"""
self.logger.debug("RPC message '%s' received with communicator '%s'", msg, _comm)
self.logger.debug("Process<%s>: received RPC message with communicator '%s': %r", self.pid, _comm, msg)

intent = msg[process_comms.INTENT_KEY]

Expand All @@ -890,7 +893,9 @@ def broadcast_receive(self, _comm: kiwipy.Communicator, body: Any, sender: Any,
:param msg: the message
"""
# pylint: disable=unused-argument
self.logger.debug("Broadcast message '%s' received with communicator '%s'", body, _comm)
self.logger.debug(
"Process<%s>: received broadcast message '%s' with communicator '%s': %r", self.pid, subject, _comm, body
)

# If we get a message we recognise then action it, otherwise ignore
if subject == process_comms.Intent.PLAY:
Expand Down

0 comments on commit 76bf8da

Please sign in to comment.