diff --git a/plumpy/processes.py b/plumpy/processes.py index 783ac5f1..5dee8210 100644 --- a/plumpy/processes.py +++ b/plumpy/processes.py @@ -296,7 +296,7 @@ def init(self) -> None: identifier = self._communicator.add_rpc_subscriber(self.message_receive, identifier=str(self.pid)) self.add_cleanup(functools.partial(self._communicator.remove_rpc_subscriber, identifier)) except kiwipy.TimeoutError: - self.logger.exception('Process<%s> failed to register as an RPC subscriber', self.pid) + self.logger.exception('Process<%s>: failed to register as an RPC subscriber', self.pid) try: identifier = self._communicator.add_broadcast_subscriber( @@ -304,14 +304,14 @@ def init(self) -> None: ) self.add_cleanup(functools.partial(self._communicator.remove_broadcast_subscriber, identifier)) except kiwipy.TimeoutError: - self.logger.exception('Process<%s> failed to register as a broadcast subscriber', self.pid) + self.logger.exception('Process<%s>: failed to register as a broadcast subscriber', self.pid) if not self._future.done(): def try_killing(future: futures.Future) -> None: if future.cancelled(): if not self.kill('Killed by future being cancelled'): - self.logger.warning('Failed to kill process on future cancel') + self.logger.warning('Process<%s>: Failed to kill process on future cancel', self.pid) self._future.add_done_callback(try_killing) @@ -690,12 +690,15 @@ def on_entered(self, from_state: Optional[process_states.State]) -> None: if self._communicator and isinstance(self.state, enum.Enum): from_label = cast(enum.Enum, from_state.LABEL).value if from_state is not None else None subject = 'state_changed.{}.{}'.format(from_label, self.state.value) - self.logger.info('Broadcasting state change of %d: %s', self.pid, subject) + self.logger.info('Process<%s>: Broadcasting state change: %s', self.pid, subject) try: self._communicator.broadcast_send(body=None, sender=self.pid, subject=subject) except ConnectionClosed: - message = 'no connection available to broadcast state change from %s to %s' - self.logger.info(message, from_label, self.state.value) + message = 'Process<%s>: no connection available to broadcast state change from %s to %s' + self.logger.warning(message, self.pid, from_label, self.state.value) + except kiwipy.TimeoutError: + message = 'Process<%s>: sending broadcast of state change from %s to %s timed out' + self.logger.warning(message, self.pid, from_label, self.state.value) def on_exiting(self) -> None: state = self.state @@ -842,7 +845,7 @@ def on_close(self) -> None: try: cleanup() except Exception: # pylint: disable=broad-except - self.logger.exception('Exception calling cleanup method %s', cleanup) + self.logger.exception('Process<%s>: Exception calling cleanup method %s', self.pid, cleanup) self._cleanups = None finally: self._event_callbacks = {} @@ -863,7 +866,7 @@ def message_receive(self, _comm: kiwipy.Communicator, msg: Dict[str, Any]) -> An :param msg: the message :return: the outcome of processing the message, the return value will be sent back as a response to the sender """ - self.logger.debug("RPC message '%s' received with communicator '%s'", msg, _comm) + self.logger.debug("Process<%s>: received RPC message with communicator '%s': %r", self.pid, _comm, msg) intent = msg[process_comms.INTENT_KEY] @@ -890,7 +893,9 @@ def broadcast_receive(self, _comm: kiwipy.Communicator, body: Any, sender: Any, :param msg: the message """ # pylint: disable=unused-argument - self.logger.debug("Broadcast message '%s' received with communicator '%s'", body, _comm) + self.logger.debug( + "Process<%s>: received broadcast message '%s' with communicator '%s': %r", self.pid, subject, _comm, body + ) # If we get a message we recognise then action it, otherwise ignore if subject == process_comms.Intent.PLAY: