diff --git a/p2p/chain.py b/p2p/chain.py index 40c6463e17..07220a3f15 100644 --- a/p2p/chain.py +++ b/p2p/chain.py @@ -41,7 +41,7 @@ from p2p.constants import MAX_REORG_DEPTH, SEAL_CHECK_RANDOM_SAMPLE_RATE from p2p.exceptions import NoEligiblePeers, OperationCancelled from p2p.p2p_proto import DisconnectReason -from p2p.peer import BasePeer, ETHPeer, LESPeer, PeerPool, PeerPoolSubscriber +from p2p.peer import BasePeer, ETHPeer, LESPeer, PeerPool, PeerSubscriber from p2p.rlp import BlockBody from p2p.service import BaseService from p2p.utils import ( @@ -58,7 +58,7 @@ HeaderRequestingPeer = Union[LESPeer, ETHPeer] -class BaseHeaderChainSyncer(BaseService, PeerPoolSubscriber): +class BaseHeaderChainSyncer(BaseService, PeerSubscriber): """ Sync with the Ethereum network by fetching/storing block headers. @@ -90,6 +90,13 @@ def __init__(self, self._new_headers: asyncio.Queue[Tuple[BlockHeader, ...]] = asyncio.Queue() self._executor = get_process_pool_executor() + @property + def msg_queue_maxsize(self) -> int: + # This is a rather arbitrary value, but when the sync is operating normally we never see + # the msg queue grow past a few hundred items, so this should be a reasonable limit for + # now. + return 2000 + def register_peer(self, peer: BasePeer) -> None: self._sync_requests.put_nowait(cast(HeaderRequestingPeer, self.peer_pool.highest_td_peer)) diff --git a/p2p/lightchain.py b/p2p/lightchain.py index 70e2f9c5d8..071b2a11ce 100644 --- a/p2p/lightchain.py +++ b/p2p/lightchain.py @@ -56,7 +56,7 @@ from p2p.peer import ( LESPeer, PeerPool, - PeerPoolSubscriber, + PeerSubscriber, ) from p2p.rlp import BlockBody from p2p.service import ( @@ -69,7 +69,7 @@ from trinity.db.header import BaseAsyncHeaderDB # noqa: F401 -class LightPeerChain(PeerPoolSubscriber, BaseService): +class LightPeerChain(PeerSubscriber, BaseService): reply_timeout = REPLY_TIMEOUT headerdb: 'BaseAsyncHeaderDB' = None @@ -78,12 +78,18 @@ def __init__( headerdb: 'BaseAsyncHeaderDB', peer_pool: PeerPool, token: CancelToken = None) -> None: - PeerPoolSubscriber.__init__(self) + PeerSubscriber.__init__(self) BaseService.__init__(self, token) self.headerdb = headerdb self.peer_pool = peer_pool self._pending_replies: Dict[int, Callable[[protocol._DecodedMsgType], None]] = {} + @property + def msg_queue_maxsize(self) -> int: + # Here we only care about replies to our requests, ignoring most msgs (which are supposed + # to be handled by the chain syncer), so our queue should never grow too much. + return 500 + async def _run(self) -> None: with self.subscribe(self.peer_pool): while True: diff --git a/p2p/peer.py b/p2p/peer.py index cf2eed9d83..efe4cda3da 100644 --- a/p2p/peer.py +++ b/p2p/peer.py @@ -174,7 +174,7 @@ def __init__(self, self.headerdb = headerdb self.network_id = network_id self.inbound = inbound - self._subscribers: List['asyncio.Queue[PEER_MSG_TYPE]'] = [] + self._subscribers: List[PeerSubscriber] = [] self.egress_mac = egress_mac self.ingress_mac = ingress_mac @@ -195,10 +195,10 @@ async def process_sub_proto_handshake( self, cmd: protocol.Command, msg: protocol._DecodedMsgType) -> None: raise NotImplementedError("Must be implemented by subclasses") - def add_subscriber(self, subscriber: 'asyncio.Queue[PEER_MSG_TYPE]') -> None: + def add_subscriber(self, subscriber: 'PeerSubscriber') -> None: self._subscribers.append(subscriber) - def remove_subscriber(self, subscriber: 'asyncio.Queue[PEER_MSG_TYPE]') -> None: + def remove_subscriber(self, subscriber: 'PeerSubscriber') -> None: if subscriber in self._subscribers: self._subscribers.remove(subscriber) @@ -353,7 +353,7 @@ def handle_p2p_msg(self, cmd: protocol.Command, msg: protocol._DecodedMsgType) - def handle_sub_proto_msg(self, cmd: protocol.Command, msg: protocol._DecodedMsgType) -> None: if self._subscribers: for subscriber in self._subscribers: - subscriber.put_nowait((self, cmd, msg)) + subscriber.add_msg((self, cmd, msg)) else: self.logger.warn("Peer %s has no subscribers, discarding %s msg", self, cmd) @@ -608,16 +608,21 @@ def request_block_headers(self, block_number_or_hash: Union[int, bytes], self.sub_proto.send_get_block_headers(block_number_or_hash, max_headers, reverse) -class PeerPoolSubscriber(ABC): +class PeerSubscriber(ABC): _msg_queue: 'asyncio.Queue[PEER_MSG_TYPE]' = None + @property + @abstractmethod + def msg_queue_maxsize(self) -> int: + raise NotImplementedError("Must be implemented by subclasses") + def register_peer(self, peer: BasePeer) -> None: """ Notify about each registered peer in the :class:`~p2p.peer.PeerPool`. Is called upon subscription for each :class:`~p2p.peer.BasePeer` that exists in the pool at that time and then for each :class:`~p2p.peer.BasePeer` that joins the pool later on. - A :class:`~p2p.peer.PeerPoolSubscriber` that wants to act upon peer registration needs to + A :class:`~p2p.peer.PeerSubscriber` that wants to act upon peer registration needs to overwrite this method to provide an implementation. """ pass @@ -625,11 +630,24 @@ def register_peer(self, peer: BasePeer) -> None: @property def msg_queue(self) -> 'asyncio.Queue[PEER_MSG_TYPE]': if self._msg_queue is None: - self._msg_queue = asyncio.Queue(maxsize=10000) - # Add a _name to our msg_queue so that the stats logged by PeerPool are more useful. - self._msg_queue._name = self.__class__.__name__ # type: ignore + self._msg_queue = asyncio.Queue(maxsize=self.msg_queue_maxsize) return self._msg_queue + @property + def queue_size(self) -> int: + return self.msg_queue.qsize() + + def add_msg(self, msg: 'PEER_MSG_TYPE') -> None: + peer, cmd, _ = msg + try: + self.logger.trace( # type: ignore + "Adding %s msg from %s to queue; queue_size=%d", cmd, peer, self.queue_size) + self.msg_queue.put_nowait(msg) + except asyncio.queues.QueueFull: + self.logger.warn( # type: ignore + "%s msg queue is full; discarding %s msg from %s", + self.__class__.__name__, cmd, peer) + @contextlib.contextmanager def subscribe(self, peer_pool: 'PeerPool') -> Iterator[None]: peer_pool.subscribe(self) @@ -662,7 +680,7 @@ def __init__(self, self.vm_configuration = vm_configuration self.max_peers = max_peers self.connected_nodes: Dict[Node, BasePeer] = {} - self._subscribers: List[PeerPoolSubscriber] = [] + self._subscribers: List[PeerSubscriber] = [] def __len__(self) -> int: return len(self.connected_nodes) @@ -680,17 +698,17 @@ def is_valid_connection_candidate(self, candidate: Node) -> bool: matching_ip_nodes = nodes_by_ip.get(candidate.address.ip, []) return len(matching_ip_nodes) <= 2 - def subscribe(self, subscriber: PeerPoolSubscriber) -> None: + def subscribe(self, subscriber: PeerSubscriber) -> None: self._subscribers.append(subscriber) for peer in self.connected_nodes.values(): subscriber.register_peer(peer) - peer.add_subscriber(subscriber.msg_queue) + peer.add_subscriber(subscriber) - def unsubscribe(self, subscriber: PeerPoolSubscriber) -> None: + def unsubscribe(self, subscriber: PeerSubscriber) -> None: if subscriber in self._subscribers: self._subscribers.remove(subscriber) for peer in self.connected_nodes.values(): - peer.remove_subscriber(subscriber.msg_queue) + peer.remove_subscriber(subscriber) async def start_peer(self, peer: BasePeer) -> None: try: @@ -711,15 +729,15 @@ def _add_peer(self, """Add the given peer to the pool. Appart from adding it to our list of connected nodes and adding each of our subscriber's - msg_queue to the peer, we also add the given messages to our subscriber's queues. + to the peer, we also add the given messages to our subscriber's queues. """ self.logger.info('Adding %s to pool', peer) self.connected_nodes[peer.remote] = peer for subscriber in self._subscribers: subscriber.register_peer(peer) - peer.add_subscriber(subscriber.msg_queue) + peer.add_subscriber(subscriber) for cmd, msg in msgs: - subscriber.msg_queue.put_nowait((peer, cmd, msg)) + subscriber.add_msg((peer, cmd, msg)) async def _run(self) -> None: # FIXME: PeerPool should probably no longer be a BaseService, but for now we're keeping it @@ -875,9 +893,9 @@ async def _periodically_report_stats(self) -> None: msg = "%s: running=%s, subscribers=%d" % (peer, peer.is_running, subscribers) if subscribers: longest_queue = max( - peer._subscribers, key=operator.methodcaller('qsize')) + peer._subscribers, key=operator.attrgetter('queue_size')) msg += " longest_subscriber_queue=%s(%d)" % ( - longest_queue._name, longest_queue.qsize()) # type: ignore + longest_queue.__class__.__name__, longest_queue.queue_size) self.logger.debug(msg) self.logger.debug("== End peer details == ") try: diff --git a/p2p/shard_syncer.py b/p2p/shard_syncer.py index 9005f2e1e2..303b81545c 100644 --- a/p2p/shard_syncer.py +++ b/p2p/shard_syncer.py @@ -43,7 +43,7 @@ from p2p.service import BaseService from p2p.peer import ( PeerPool, - PeerPoolSubscriber, + PeerSubscriber, ) from p2p.sharding_peer import ( @@ -68,7 +68,7 @@ COLLATION_PERIOD = 1 -class ShardSyncer(BaseService, PeerPoolSubscriber): +class ShardSyncer(BaseService, PeerSubscriber): def __init__(self, shard: Shard, peer_pool: PeerPool, token: CancelToken=None) -> None: super().__init__(token) @@ -80,6 +80,13 @@ def __init__(self, shard: Shard, peer_pool: PeerPool, token: CancelToken=None) - self.start_time = time.time() + @property + def msg_queue_maxsize(self) -> int: + # This is a rather arbitrary value, but when the sync is operating normally we never see + # the msg queue grow past a few hundred items, so this should be a reasonable limit for + # now. + return 2000 + async def _cleanup(self) -> None: pass diff --git a/p2p/state.py b/p2p/state.py index e40f1f8f3d..85f85d82f6 100644 --- a/p2p/state.py +++ b/p2p/state.py @@ -42,7 +42,7 @@ from p2p.chain import PeerRequestHandler from p2p.cancel_token import CancelToken from p2p.exceptions import OperationCancelled -from p2p.peer import ETHPeer, PeerPool, PeerPoolSubscriber +from p2p.peer import ETHPeer, PeerPool, PeerSubscriber from p2p.service import BaseService from p2p.utils import get_process_pool_executor @@ -51,7 +51,7 @@ from trinity.db.chain import AsyncChainDB # noqa: F401 -class StateDownloader(BaseService, PeerPoolSubscriber): +class StateDownloader(BaseService, PeerSubscriber): _pending_nodes: Dict[Any, float] = {} _total_processed_nodes = 0 _report_interval = 10 # Number of seconds between progress reports. @@ -74,6 +74,13 @@ def __init__(self, self._peers_with_pending_requests: Dict[ETHPeer, float] = {} self._executor = get_process_pool_executor() + @property + def msg_queue_maxsize(self) -> int: + # This is a rather arbitrary value, but when the sync is operating normally we never see + # the msg queue grow past a few hundred items, so this should be a reasonable limit for + # now. + return 2000 + async def _get_idle_peers(self) -> List[ETHPeer]: # FIXME: Should probably use get_peers() and pass the TD of our head? It's not really # necessary because peers that are behind us may very well have the trie nodes we want. diff --git a/tests/p2p/peer_helpers.py b/tests/p2p/peer_helpers.py index 980ac5a635..a78d3c39ab 100644 --- a/tests/p2p/peer_helpers.py +++ b/tests/p2p/peer_helpers.py @@ -3,6 +3,7 @@ from typing import List from eth_hash.auto import keccak +from eth.utils.logging import TraceLogger from eth.chains.mainnet import MAINNET_GENESIS_HEADER from eth.db.backends.memory import MemoryDB @@ -12,7 +13,7 @@ from p2p import ecies from p2p import kademlia from p2p.cancel_token import CancelToken -from p2p.peer import BasePeer, LESPeer, PeerPool +from p2p.peer import BasePeer, LESPeer, PeerPool, PeerSubscriber from p2p.server import decode_authentication from integration_test_helpers import FakeAsyncHeaderDB @@ -160,3 +161,11 @@ def __init__(self, peers: List[BasePeer]) -> None: async def _run(self) -> None: raise NotImplementedError("This is a mock PeerPool implementation, you must not _run() it") + + +class SamplePeerSubscriber(PeerSubscriber): + logger = TraceLogger("") + + @property + def msg_queue_maxsize(self) -> int: + return 100 diff --git a/tests/p2p/test_sharding.py b/tests/p2p/test_sharding.py index deaaae5e6f..f2b7b59579 100644 --- a/tests/p2p/test_sharding.py +++ b/tests/p2p/test_sharding.py @@ -44,6 +44,7 @@ get_directly_linked_peers, get_directly_linked_peers_without_handshake, MockPeerPoolWithConnectedPeers, + SamplePeerSubscriber, ) from cytoolz import ( @@ -203,7 +204,7 @@ async def test_new_collations_notification(request, event_loop): # setup a-b-c topology peer_a_b, peer_b_a = await get_directly_linked_sharding_peers(request, event_loop) peer_b_c, peer_c_b = await get_directly_linked_sharding_peers(request, event_loop) - peer_c_b_subscriber = asyncio.Queue() + peer_c_b_subscriber = SamplePeerSubscriber() peer_c_b.add_subscriber(peer_c_b_subscriber) peer_pool_b = MockPeerPoolWithConnectedPeers([peer_b_a, peer_b_c]) @@ -223,7 +224,7 @@ def finalizer(): c1 = next(collations) peer_a_b.sub_proto.send_collations(0, [c1]) peer, cmd, msg = await asyncio.wait_for( - peer_c_b_subscriber.get(), + peer_c_b_subscriber.msg_queue.get(), timeout=1, ) assert peer == peer_c_b @@ -234,7 +235,7 @@ def finalizer(): c2 = next(collations) peer_a_b.sub_proto.send_collations(0, [c1, c2]) peer, cmd, msg = await asyncio.wait_for( - peer_c_b_subscriber.get(), + peer_c_b_subscriber.msg_queue.get(), timeout=1, ) assert peer == peer_c_b @@ -246,7 +247,7 @@ def finalizer(): async def test_syncer_requests_new_collations(request, event_loop): # setup a-b topology peer_a_b, peer_b_a = await get_directly_linked_sharding_peers(request, event_loop) - peer_a_b_subscriber = asyncio.Queue() + peer_a_b_subscriber = SamplePeerSubscriber() peer_a_b.add_subscriber(peer_a_b_subscriber) peer_pool_b = MockPeerPoolWithConnectedPeers([peer_b_a]) @@ -266,7 +267,7 @@ def finalizer(): hashes_and_periods = ((b"\xaa" * 32, 0),) peer_a_b.sub_proto.send_new_collation_hashes(hashes_and_periods) peer, cmd, msg = await asyncio.wait_for( - peer_a_b_subscriber.get(), + peer_a_b_subscriber.msg_queue.get(), timeout=1, ) assert peer == peer_a_b @@ -278,7 +279,7 @@ def finalizer(): async def test_syncer_proposing(request, event_loop): # setup a-b topology peer_a_b, peer_b_a = await get_directly_linked_sharding_peers(request, event_loop) - peer_a_b_subscriber = asyncio.Queue() + peer_a_b_subscriber = SamplePeerSubscriber() peer_a_b.add_subscriber(peer_a_b_subscriber) peer_pool_b = MockPeerPoolWithConnectedPeers([peer_b_a]) @@ -297,7 +298,7 @@ def finalizer(): # propose at b and check that it announces its proposal await syncer.propose() peer, cmd, msg = await asyncio.wait_for( - peer_a_b_subscriber.get(), + peer_a_b_subscriber.msg_queue.get(), timeout=1, ) assert peer == peer_a_b diff --git a/tests/p2p/test_tx_pool.py b/tests/p2p/test_tx_pool.py index b84b314e92..eb41cf896e 100644 --- a/tests/p2p/test_tx_pool.py +++ b/tests/p2p/test_tx_pool.py @@ -26,6 +26,7 @@ from tests.p2p.peer_helpers import ( get_directly_linked_peers, MockPeerPoolWithConnectedPeers, + SamplePeerSubscriber, ) # TODO: Move this file into the trinity tests (Requires refactor of peer_helpers) @@ -166,7 +167,7 @@ async def test_tx_sending(request, event_loop, chain_with_block_validation, tx_v peer2_class=ETHPeer, ) - peer2_subscriber = asyncio.Queue() + peer2_subscriber = SamplePeerSubscriber() peer2.add_subscriber(peer2_subscriber) pool = TxPool(MockPeerPoolWithConnectedPeers([peer1, peer2]), tx_validator) @@ -183,7 +184,7 @@ def finalizer(): # Ensure that peer2 gets the transactions peer, cmd, msg = await asyncio.wait_for( - peer2_subscriber.get(), + peer2_subscriber.msg_queue.get(), timeout=0.1, ) diff --git a/trinity/plugins/builtin/tx_pool/pool.py b/trinity/plugins/builtin/tx_pool/pool.py index 33ca270e76..63442eb373 100644 --- a/trinity/plugins/builtin/tx_pool/pool.py +++ b/trinity/plugins/builtin/tx_pool/pool.py @@ -21,14 +21,14 @@ BasePeer, ETHPeer, PeerPool, - PeerPoolSubscriber, + PeerSubscriber, ) from p2p.service import ( BaseService ) -class TxPool(BaseService, PeerPoolSubscriber): +class TxPool(BaseService, PeerSubscriber): """ The :class:`~trinity.tx_pool.pool.TxPool` class is responsible for holding and relaying of transactions, represented as :class:`~eth.rlp.transactions.BaseTransaction` among the @@ -56,8 +56,12 @@ def __init__(self, self._bloom = BloomFilter(max_elements=1000000) self._bloom_salt = str(uuid.uuid4()) - def register_peer(self, peer: BasePeer) -> None: - pass + @property + def msg_queue_maxsize(self) -> int: + # This is a rather arbitrary value, but when the sync is operating normally we never see + # the msg queue grow past a few hundred items, so this should be a reasonable limit for + # now. + return 2000 async def _run(self) -> None: self.logger.info("Running Tx Pool")