Skip to content

Commit

Permalink
p2p: Some improvements to PeerPoolSubscriber
Browse files Browse the repository at this point in the history
* Renamed to PeerSubscriber because that's what it really is
* New @AbstractMethod max_queue_size, to force each subscriber to define their own
* Peer now keeps a list of actual PeerSubscribers instead of just their msg queues
* PeerSubscriber provides APIs to add msgs to queue and check queue
  size, abstracting underlying implementation
* If an attempt is made to add a msg but the queue is full, we log a
  warning and drop the msg (Closes: ethereum#1032)
  • Loading branch information
gsalgado committed Jul 20, 2018
1 parent 9e97747 commit 1fe5381
Show file tree
Hide file tree
Showing 9 changed files with 102 additions and 40 deletions.
11 changes: 9 additions & 2 deletions p2p/chain.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@
from p2p.constants import MAX_REORG_DEPTH, SEAL_CHECK_RANDOM_SAMPLE_RATE
from p2p.exceptions import NoEligiblePeers, OperationCancelled
from p2p.p2p_proto import DisconnectReason
from p2p.peer import BasePeer, ETHPeer, LESPeer, PeerPool, PeerPoolSubscriber
from p2p.peer import BasePeer, ETHPeer, LESPeer, PeerPool, PeerSubscriber
from p2p.rlp import BlockBody
from p2p.service import BaseService
from p2p.utils import (
Expand All @@ -58,7 +58,7 @@
HeaderRequestingPeer = Union[LESPeer, ETHPeer]


class BaseHeaderChainSyncer(BaseService, PeerPoolSubscriber):
class BaseHeaderChainSyncer(BaseService, PeerSubscriber):
"""
Sync with the Ethereum network by fetching/storing block headers.
Expand Down Expand Up @@ -90,6 +90,13 @@ def __init__(self,
self._new_headers: asyncio.Queue[Tuple[BlockHeader, ...]] = asyncio.Queue()
self._executor = get_process_pool_executor()

@property
def msg_queue_maxsize(self) -> int:
# This is a rather arbitrary value, but when the sync is operating normally we never see
# the msg queue grow past a few hundred items, so this should be a reasonable limit for
# now.
return 2000

def register_peer(self, peer: BasePeer) -> None:
self._sync_requests.put_nowait(cast(HeaderRequestingPeer, self.peer_pool.highest_td_peer))

Expand Down
12 changes: 9 additions & 3 deletions p2p/lightchain.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@
from p2p.peer import (
LESPeer,
PeerPool,
PeerPoolSubscriber,
PeerSubscriber,
)
from p2p.rlp import BlockBody
from p2p.service import (
Expand All @@ -69,7 +69,7 @@
from trinity.db.header import BaseAsyncHeaderDB # noqa: F401


class LightPeerChain(PeerPoolSubscriber, BaseService):
class LightPeerChain(PeerSubscriber, BaseService):
reply_timeout = REPLY_TIMEOUT
headerdb: 'BaseAsyncHeaderDB' = None

Expand All @@ -78,12 +78,18 @@ def __init__(
headerdb: 'BaseAsyncHeaderDB',
peer_pool: PeerPool,
token: CancelToken = None) -> None:
PeerPoolSubscriber.__init__(self)
PeerSubscriber.__init__(self)
BaseService.__init__(self, token)
self.headerdb = headerdb
self.peer_pool = peer_pool
self._pending_replies: Dict[int, Callable[[protocol._DecodedMsgType], None]] = {}

@property
def msg_queue_maxsize(self) -> int:
# Here we only care about replies to our requests, ignoring most msgs (which are supposed
# to be handled by the chain syncer), so our queue should never grow too much.
return 500

async def _run(self) -> None:
with self.subscribe(self.peer_pool):
while True:
Expand Down
54 changes: 37 additions & 17 deletions p2p/peer.py
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,7 @@ def __init__(self,
self.headerdb = headerdb
self.network_id = network_id
self.inbound = inbound
self._subscribers: List['asyncio.Queue[PEER_MSG_TYPE]'] = []
self._subscribers: List[PeerSubscriber] = []

self.egress_mac = egress_mac
self.ingress_mac = ingress_mac
Expand All @@ -195,10 +195,10 @@ async def process_sub_proto_handshake(
self, cmd: protocol.Command, msg: protocol._DecodedMsgType) -> None:
raise NotImplementedError("Must be implemented by subclasses")

def add_subscriber(self, subscriber: 'asyncio.Queue[PEER_MSG_TYPE]') -> None:
def add_subscriber(self, subscriber: 'PeerSubscriber') -> None:
self._subscribers.append(subscriber)

def remove_subscriber(self, subscriber: 'asyncio.Queue[PEER_MSG_TYPE]') -> None:
def remove_subscriber(self, subscriber: 'PeerSubscriber') -> None:
if subscriber in self._subscribers:
self._subscribers.remove(subscriber)

Expand Down Expand Up @@ -353,7 +353,7 @@ def handle_p2p_msg(self, cmd: protocol.Command, msg: protocol._DecodedMsgType) -
def handle_sub_proto_msg(self, cmd: protocol.Command, msg: protocol._DecodedMsgType) -> None:
if self._subscribers:
for subscriber in self._subscribers:
subscriber.put_nowait((self, cmd, msg))
subscriber.add_msg((self, cmd, msg))
else:
self.logger.warn("Peer %s has no subscribers, discarding %s msg", self, cmd)

Expand Down Expand Up @@ -608,28 +608,48 @@ def request_block_headers(self, block_number_or_hash: Union[int, bytes],
self.sub_proto.send_get_block_headers(block_number_or_hash, max_headers, reverse)


class PeerPoolSubscriber(ABC):
class PeerSubscriber(ABC):
_msg_queue: 'asyncio.Queue[PEER_MSG_TYPE]' = None

@property
@abstractmethod
def msg_queue_maxsize(self) -> int:
raise NotImplementedError("Must be implemented by subclasses")

def register_peer(self, peer: BasePeer) -> None:
"""
Notify about each registered peer in the :class:`~p2p.peer.PeerPool`. Is called upon
subscription for each :class:`~p2p.peer.BasePeer` that exists in the pool at that time and
then for each :class:`~p2p.peer.BasePeer` that joins the pool later on.
A :class:`~p2p.peer.PeerPoolSubscriber` that wants to act upon peer registration needs to
A :class:`~p2p.peer.PeerSubscriber` that wants to act upon peer registration needs to
overwrite this method to provide an implementation.
"""
pass

@property
def msg_queue(self) -> 'asyncio.Queue[PEER_MSG_TYPE]':
if self._msg_queue is None:
self._msg_queue = asyncio.Queue(maxsize=10000)
self._msg_queue = asyncio.Queue(maxsize=self.msg_queue_maxsize)
# Add a _name to our msg_queue so that the stats logged by PeerPool are more useful.
self._msg_queue._name = self.__class__.__name__ # type: ignore
return self._msg_queue

@property
def queue_size(self) -> int:
return self.msg_queue.qsize()

def add_msg(self, msg: 'PEER_MSG_TYPE') -> None:
peer, cmd, _ = msg
try:
self.logger.trace( # type: ignore
"Adding %s msg from %s to queue; queue_size=%d", cmd, peer, self.queue_size)
self.msg_queue.put_nowait(msg)
except asyncio.queues.QueueFull:
self.logger.warn( # type: ignore
"%s msg queue is full; discarding %s msg from %s",
self.__class__.__name__, cmd, peer)

@contextlib.contextmanager
def subscribe(self, peer_pool: 'PeerPool') -> Iterator[None]:
peer_pool.subscribe(self)
Expand Down Expand Up @@ -662,7 +682,7 @@ def __init__(self,
self.vm_configuration = vm_configuration
self.max_peers = max_peers
self.connected_nodes: Dict[Node, BasePeer] = {}
self._subscribers: List[PeerPoolSubscriber] = []
self._subscribers: List[PeerSubscriber] = []

def __len__(self) -> int:
return len(self.connected_nodes)
Expand All @@ -680,17 +700,17 @@ def is_valid_connection_candidate(self, candidate: Node) -> bool:
matching_ip_nodes = nodes_by_ip.get(candidate.address.ip, [])
return len(matching_ip_nodes) <= 2

def subscribe(self, subscriber: PeerPoolSubscriber) -> None:
def subscribe(self, subscriber: PeerSubscriber) -> None:
self._subscribers.append(subscriber)
for peer in self.connected_nodes.values():
subscriber.register_peer(peer)
peer.add_subscriber(subscriber.msg_queue)
peer.add_subscriber(subscriber)

def unsubscribe(self, subscriber: PeerPoolSubscriber) -> None:
def unsubscribe(self, subscriber: PeerSubscriber) -> None:
if subscriber in self._subscribers:
self._subscribers.remove(subscriber)
for peer in self.connected_nodes.values():
peer.remove_subscriber(subscriber.msg_queue)
peer.remove_subscriber(subscriber)

async def start_peer(self, peer: BasePeer) -> None:
try:
Expand All @@ -711,15 +731,15 @@ def _add_peer(self,
"""Add the given peer to the pool.
Appart from adding it to our list of connected nodes and adding each of our subscriber's
msg_queue to the peer, we also add the given messages to our subscriber's queues.
to the peer, we also add the given messages to our subscriber's queues.
"""
self.logger.info('Adding %s to pool', peer)
self.connected_nodes[peer.remote] = peer
for subscriber in self._subscribers:
subscriber.register_peer(peer)
peer.add_subscriber(subscriber.msg_queue)
peer.add_subscriber(subscriber)
for cmd, msg in msgs:
subscriber.msg_queue.put_nowait((peer, cmd, msg))
subscriber.add_msg((peer, cmd, msg))

async def _run(self) -> None:
# FIXME: PeerPool should probably no longer be a BaseService, but for now we're keeping it
Expand Down Expand Up @@ -875,9 +895,9 @@ async def _periodically_report_stats(self) -> None:
msg = "%s: running=%s, subscribers=%d" % (peer, peer.is_running, subscribers)
if subscribers:
longest_queue = max(
peer._subscribers, key=operator.methodcaller('qsize'))
peer._subscribers, key=operator.attrgetter('queue_size'))
msg += " longest_subscriber_queue=%s(%d)" % (
longest_queue._name, longest_queue.qsize()) # type: ignore
longest_queue.__class__.__name__, longest_queue.queue_size)
self.logger.debug(msg)
self.logger.debug("== End peer details == ")
try:
Expand Down
11 changes: 9 additions & 2 deletions p2p/shard_syncer.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@
from p2p.service import BaseService
from p2p.peer import (
PeerPool,
PeerPoolSubscriber,
PeerSubscriber,
)

from p2p.sharding_peer import (
Expand All @@ -68,7 +68,7 @@
COLLATION_PERIOD = 1


class ShardSyncer(BaseService, PeerPoolSubscriber):
class ShardSyncer(BaseService, PeerSubscriber):

def __init__(self, shard: Shard, peer_pool: PeerPool, token: CancelToken=None) -> None:
super().__init__(token)
Expand All @@ -80,6 +80,13 @@ def __init__(self, shard: Shard, peer_pool: PeerPool, token: CancelToken=None) -

self.start_time = time.time()

@property
def msg_queue_maxsize(self) -> int:
# This is a rather arbitrary value, but when the sync is operating normally we never see
# the msg queue grow past a few hundred items, so this should be a reasonable limit for
# now.
return 2000

async def _cleanup(self) -> None:
pass

Expand Down
11 changes: 9 additions & 2 deletions p2p/state.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@
from p2p.chain import PeerRequestHandler
from p2p.cancel_token import CancelToken
from p2p.exceptions import OperationCancelled
from p2p.peer import ETHPeer, PeerPool, PeerPoolSubscriber
from p2p.peer import ETHPeer, PeerPool, PeerSubscriber
from p2p.service import BaseService
from p2p.utils import get_process_pool_executor

Expand All @@ -51,7 +51,7 @@
from trinity.db.chain import AsyncChainDB # noqa: F401


class StateDownloader(BaseService, PeerPoolSubscriber):
class StateDownloader(BaseService, PeerSubscriber):
_pending_nodes: Dict[Any, float] = {}
_total_processed_nodes = 0
_report_interval = 10 # Number of seconds between progress reports.
Expand All @@ -74,6 +74,13 @@ def __init__(self,
self._peers_with_pending_requests: Dict[ETHPeer, float] = {}
self._executor = get_process_pool_executor()

@property
def msg_queue_maxsize(self) -> int:
# This is a rather arbitrary value, but when the sync is operating normally we never see
# the msg queue grow past a few hundred items, so this should be a reasonable limit for
# now.
return 2000

async def _get_idle_peers(self) -> List[ETHPeer]:
# FIXME: Should probably use get_peers() and pass the TD of our head? It's not really
# necessary because peers that are behind us may very well have the trie nodes we want.
Expand Down
11 changes: 10 additions & 1 deletion tests/p2p/peer_helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
from typing import List

from eth_hash.auto import keccak
from eth.utils.logging import TraceLogger

from eth.chains.mainnet import MAINNET_GENESIS_HEADER
from eth.db.backends.memory import MemoryDB
Expand All @@ -12,7 +13,7 @@
from p2p import ecies
from p2p import kademlia
from p2p.cancel_token import CancelToken
from p2p.peer import BasePeer, LESPeer, PeerPool
from p2p.peer import BasePeer, LESPeer, PeerPool, PeerSubscriber
from p2p.server import decode_authentication

from integration_test_helpers import FakeAsyncHeaderDB
Expand Down Expand Up @@ -160,3 +161,11 @@ def __init__(self, peers: List[BasePeer]) -> None:

async def _run(self) -> None:
raise NotImplementedError("This is a mock PeerPool implementation, you must not _run() it")


class SamplePeerSubscriber(PeerSubscriber):
logger = TraceLogger("")

@property
def msg_queue_maxsize(self) -> int:
return 100
15 changes: 8 additions & 7 deletions tests/p2p/test_sharding.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
get_directly_linked_peers,
get_directly_linked_peers_without_handshake,
MockPeerPoolWithConnectedPeers,
SamplePeerSubscriber,
)

from cytoolz import (
Expand Down Expand Up @@ -203,7 +204,7 @@ async def test_new_collations_notification(request, event_loop):
# setup a-b-c topology
peer_a_b, peer_b_a = await get_directly_linked_sharding_peers(request, event_loop)
peer_b_c, peer_c_b = await get_directly_linked_sharding_peers(request, event_loop)
peer_c_b_subscriber = asyncio.Queue()
peer_c_b_subscriber = SamplePeerSubscriber()
peer_c_b.add_subscriber(peer_c_b_subscriber)
peer_pool_b = MockPeerPoolWithConnectedPeers([peer_b_a, peer_b_c])

Expand All @@ -223,7 +224,7 @@ def finalizer():
c1 = next(collations)
peer_a_b.sub_proto.send_collations(0, [c1])
peer, cmd, msg = await asyncio.wait_for(
peer_c_b_subscriber.get(),
peer_c_b_subscriber.msg_queue.get(),
timeout=1,
)
assert peer == peer_c_b
Expand All @@ -234,7 +235,7 @@ def finalizer():
c2 = next(collations)
peer_a_b.sub_proto.send_collations(0, [c1, c2])
peer, cmd, msg = await asyncio.wait_for(
peer_c_b_subscriber.get(),
peer_c_b_subscriber.msg_queue.get(),
timeout=1,
)
assert peer == peer_c_b
Expand All @@ -246,7 +247,7 @@ def finalizer():
async def test_syncer_requests_new_collations(request, event_loop):
# setup a-b topology
peer_a_b, peer_b_a = await get_directly_linked_sharding_peers(request, event_loop)
peer_a_b_subscriber = asyncio.Queue()
peer_a_b_subscriber = SamplePeerSubscriber()
peer_a_b.add_subscriber(peer_a_b_subscriber)
peer_pool_b = MockPeerPoolWithConnectedPeers([peer_b_a])

Expand All @@ -266,7 +267,7 @@ def finalizer():
hashes_and_periods = ((b"\xaa" * 32, 0),)
peer_a_b.sub_proto.send_new_collation_hashes(hashes_and_periods)
peer, cmd, msg = await asyncio.wait_for(
peer_a_b_subscriber.get(),
peer_a_b_subscriber.msg_queue.get(),
timeout=1,
)
assert peer == peer_a_b
Expand All @@ -278,7 +279,7 @@ def finalizer():
async def test_syncer_proposing(request, event_loop):
# setup a-b topology
peer_a_b, peer_b_a = await get_directly_linked_sharding_peers(request, event_loop)
peer_a_b_subscriber = asyncio.Queue()
peer_a_b_subscriber = SamplePeerSubscriber()
peer_a_b.add_subscriber(peer_a_b_subscriber)
peer_pool_b = MockPeerPoolWithConnectedPeers([peer_b_a])

Expand All @@ -297,7 +298,7 @@ def finalizer():
# propose at b and check that it announces its proposal
await syncer.propose()
peer, cmd, msg = await asyncio.wait_for(
peer_a_b_subscriber.get(),
peer_a_b_subscriber.msg_queue.get(),
timeout=1,
)
assert peer == peer_a_b
Expand Down
Loading

0 comments on commit 1fe5381

Please sign in to comment.