diff --git a/p2p/peer.py b/p2p/peer.py index eda46fa605..fde49b35de 100644 --- a/p2p/peer.py +++ b/p2p/peer.py @@ -383,14 +383,19 @@ def handle_p2p_msg(self, cmd: protocol.Command, msg: protocol._DecodedMsgType) - raise UnexpectedMessage("Unexpected msg: {} ({})".format(cmd, msg)) def handle_sub_proto_msg(self, cmd: protocol.Command, msg: protocol._DecodedMsgType) -> None: + cmd_type = type(cmd) + if self._subscribers: - for subscriber in self._subscribers: - subscriber.add_msg((self, cmd, msg)) + was_added = tuple(subscriber.add_msg((self, cmd, msg)) for subscriber in self._subscribers) + if not any(was_added): + self.logger.warn( + "Peer %s has no subscribers for msg type %s", + self, + cmd_type.__name__, + ) else: self.logger.warn("Peer %s has no subscribers, discarding %s msg", self, cmd) - cmd_type = type(cmd) - if cmd_type in self.pending_requests: request, future = self.pending_requests[cmd_type] try: @@ -607,18 +612,20 @@ def add_msg(self, msg: 'PEER_MSG_TYPE') -> None: "subscriptions: %s", cmd, peer, self.subscription_msg_types, ) - return + return False try: if hasattr(self, 'logger'): self.logger.trace( # type: ignore "Adding %s msg from %s to queue; queue_size=%d", cmd, peer, self.queue_size) self.msg_queue.put_nowait(msg) + return True except asyncio.queues.QueueFull: if hasattr(self, 'logger'): self.logger.warn( # type: ignore "%s msg queue is full; discarding %s msg from %s", self.__class__.__name__, cmd, peer) + return False @contextlib.contextmanager def subscribe(self, peer_pool: 'PeerPool') -> Iterator[None]: