Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

p2p: Some improvements to PeerPoolSubscriber #1056

Merged
merged 1 commit into from
Jul 21, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
11 changes: 9 additions & 2 deletions p2p/chain.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@
from p2p.constants import MAX_REORG_DEPTH, SEAL_CHECK_RANDOM_SAMPLE_RATE
from p2p.exceptions import NoEligiblePeers, OperationCancelled
from p2p.p2p_proto import DisconnectReason
from p2p.peer import BasePeer, ETHPeer, LESPeer, PeerPool, PeerPoolSubscriber
from p2p.peer import BasePeer, ETHPeer, LESPeer, PeerPool, PeerSubscriber
from p2p.rlp import BlockBody
from p2p.service import BaseService
from p2p.utils import (
Expand All @@ -58,7 +58,7 @@
HeaderRequestingPeer = Union[LESPeer, ETHPeer]


class BaseHeaderChainSyncer(BaseService, PeerPoolSubscriber):
class BaseHeaderChainSyncer(BaseService, PeerSubscriber):
"""
Sync with the Ethereum network by fetching/storing block headers.

Expand Down Expand Up @@ -90,6 +90,13 @@ def __init__(self,
self._new_headers: asyncio.Queue[Tuple[BlockHeader, ...]] = asyncio.Queue()
self._executor = get_process_pool_executor()

@property
def msg_queue_maxsize(self) -> int:
# This is a rather arbitrary value, but when the sync is operating normally we never see
# the msg queue grow past a few hundred items, so this should be a reasonable limit for
# now.
return 2000

def register_peer(self, peer: BasePeer) -> None:
self._sync_requests.put_nowait(cast(HeaderRequestingPeer, self.peer_pool.highest_td_peer))

Expand Down
12 changes: 9 additions & 3 deletions p2p/lightchain.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@
from p2p.peer import (
LESPeer,
PeerPool,
PeerPoolSubscriber,
PeerSubscriber,
)
from p2p.rlp import BlockBody
from p2p.service import (
Expand All @@ -69,7 +69,7 @@
from trinity.db.header import BaseAsyncHeaderDB # noqa: F401


class LightPeerChain(PeerPoolSubscriber, BaseService):
class LightPeerChain(PeerSubscriber, BaseService):
reply_timeout = REPLY_TIMEOUT
headerdb: 'BaseAsyncHeaderDB' = None

Expand All @@ -78,12 +78,18 @@ def __init__(
headerdb: 'BaseAsyncHeaderDB',
peer_pool: PeerPool,
token: CancelToken = None) -> None:
PeerPoolSubscriber.__init__(self)
PeerSubscriber.__init__(self)
BaseService.__init__(self, token)
self.headerdb = headerdb
self.peer_pool = peer_pool
self._pending_replies: Dict[int, Callable[[protocol._DecodedMsgType], None]] = {}

@property
def msg_queue_maxsize(self) -> int:
# Here we only care about replies to our requests, ignoring most msgs (which are supposed
# to be handled by the chain syncer), so our queue should never grow too much.
return 500

async def _run(self) -> None:
with self.subscribe(self.peer_pool):
while True:
Expand Down
56 changes: 37 additions & 19 deletions p2p/peer.py
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,7 @@ def __init__(self,
self.headerdb = headerdb
self.network_id = network_id
self.inbound = inbound
self._subscribers: List['asyncio.Queue[PEER_MSG_TYPE]'] = []
self._subscribers: List[PeerSubscriber] = []

self.egress_mac = egress_mac
self.ingress_mac = ingress_mac
Expand All @@ -195,10 +195,10 @@ async def process_sub_proto_handshake(
self, cmd: protocol.Command, msg: protocol._DecodedMsgType) -> None:
raise NotImplementedError("Must be implemented by subclasses")

def add_subscriber(self, subscriber: 'asyncio.Queue[PEER_MSG_TYPE]') -> None:
def add_subscriber(self, subscriber: 'PeerSubscriber') -> None:
self._subscribers.append(subscriber)

def remove_subscriber(self, subscriber: 'asyncio.Queue[PEER_MSG_TYPE]') -> None:
def remove_subscriber(self, subscriber: 'PeerSubscriber') -> None:
if subscriber in self._subscribers:
self._subscribers.remove(subscriber)

Expand Down Expand Up @@ -353,7 +353,7 @@ def handle_p2p_msg(self, cmd: protocol.Command, msg: protocol._DecodedMsgType) -
def handle_sub_proto_msg(self, cmd: protocol.Command, msg: protocol._DecodedMsgType) -> None:
if self._subscribers:
for subscriber in self._subscribers:
subscriber.put_nowait((self, cmd, msg))
subscriber.add_msg((self, cmd, msg))
else:
self.logger.warn("Peer %s has no subscribers, discarding %s msg", self, cmd)

Expand Down Expand Up @@ -608,28 +608,46 @@ def request_block_headers(self, block_number_or_hash: Union[int, bytes],
self.sub_proto.send_get_block_headers(block_number_or_hash, max_headers, reverse)


class PeerPoolSubscriber(ABC):
class PeerSubscriber(ABC):
_msg_queue: 'asyncio.Queue[PEER_MSG_TYPE]' = None

@property
@abstractmethod
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've been a fan of the following pattern.

class Thing:
    @abstractmethod
    def get_value(self):
        ...

    @property
    def value(self):
        return self.get_value()

It allows for slightly less verbose subclassing and still gives all the niceties of having property accessors. I believe this is the pattern we're using for properties on our Transaction classes.

Copy link
Contributor

@carver carver Jul 20, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

slightly less verbose subclassing

Meaning the subclass doesn't have to add the @property decorator?

I've been a fan of the following pattern.

I'm not fan. I found it sort of confusing when reading the Transaction subclasses, because we define get_intrinsic_gas in the subclass, but then callers would just use intrinsic_gas. Paranoid me feels the need to go back and make sure that the superclass defines it to be exactly the same, and make sure that one of the three (caller, subclass, superclass) doesn't just have a bug. If they are exactly the same, then why have both options?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That is fair. I'd consider those concerns reason enough to abandon the pattern.

def msg_queue_maxsize(self) -> int:
raise NotImplementedError("Must be implemented by subclasses")

def register_peer(self, peer: BasePeer) -> None:
"""
Notify about each registered peer in the :class:`~p2p.peer.PeerPool`. Is called upon
subscription for each :class:`~p2p.peer.BasePeer` that exists in the pool at that time and
then for each :class:`~p2p.peer.BasePeer` that joins the pool later on.

A :class:`~p2p.peer.PeerPoolSubscriber` that wants to act upon peer registration needs to
A :class:`~p2p.peer.PeerSubscriber` that wants to act upon peer registration needs to
overwrite this method to provide an implementation.
"""
pass

@property
def msg_queue(self) -> 'asyncio.Queue[PEER_MSG_TYPE]':
if self._msg_queue is None:
self._msg_queue = asyncio.Queue(maxsize=10000)
# Add a _name to our msg_queue so that the stats logged by PeerPool are more useful.
self._msg_queue._name = self.__class__.__name__ # type: ignore
self._msg_queue = asyncio.Queue(maxsize=self.msg_queue_maxsize)
return self._msg_queue

@property
def queue_size(self) -> int:
return self.msg_queue.qsize()

def add_msg(self, msg: 'PEER_MSG_TYPE') -> None:
peer, cmd, _ = msg
try:
self.logger.trace( # type: ignore
"Adding %s msg from %s to queue; queue_size=%d", cmd, peer, self.queue_size)
self.msg_queue.put_nowait(msg)
except asyncio.queues.QueueFull:
self.logger.warn( # type: ignore
"%s msg queue is full; discarding %s msg from %s",
self.__class__.__name__, cmd, peer)

@contextlib.contextmanager
def subscribe(self, peer_pool: 'PeerPool') -> Iterator[None]:
peer_pool.subscribe(self)
Expand Down Expand Up @@ -662,7 +680,7 @@ def __init__(self,
self.vm_configuration = vm_configuration
self.max_peers = max_peers
self.connected_nodes: Dict[Node, BasePeer] = {}
self._subscribers: List[PeerPoolSubscriber] = []
self._subscribers: List[PeerSubscriber] = []

def __len__(self) -> int:
return len(self.connected_nodes)
Expand All @@ -680,17 +698,17 @@ def is_valid_connection_candidate(self, candidate: Node) -> bool:
matching_ip_nodes = nodes_by_ip.get(candidate.address.ip, [])
return len(matching_ip_nodes) <= 2

def subscribe(self, subscriber: PeerPoolSubscriber) -> None:
def subscribe(self, subscriber: PeerSubscriber) -> None:
self._subscribers.append(subscriber)
for peer in self.connected_nodes.values():
subscriber.register_peer(peer)
peer.add_subscriber(subscriber.msg_queue)
peer.add_subscriber(subscriber)

def unsubscribe(self, subscriber: PeerPoolSubscriber) -> None:
def unsubscribe(self, subscriber: PeerSubscriber) -> None:
if subscriber in self._subscribers:
self._subscribers.remove(subscriber)
for peer in self.connected_nodes.values():
peer.remove_subscriber(subscriber.msg_queue)
peer.remove_subscriber(subscriber)

async def start_peer(self, peer: BasePeer) -> None:
try:
Expand All @@ -711,15 +729,15 @@ def _add_peer(self,
"""Add the given peer to the pool.

Appart from adding it to our list of connected nodes and adding each of our subscriber's
msg_queue to the peer, we also add the given messages to our subscriber's queues.
to the peer, we also add the given messages to our subscriber's queues.
"""
self.logger.info('Adding %s to pool', peer)
self.connected_nodes[peer.remote] = peer
for subscriber in self._subscribers:
subscriber.register_peer(peer)
peer.add_subscriber(subscriber.msg_queue)
peer.add_subscriber(subscriber)
for cmd, msg in msgs:
subscriber.msg_queue.put_nowait((peer, cmd, msg))
subscriber.add_msg((peer, cmd, msg))

async def _run(self) -> None:
# FIXME: PeerPool should probably no longer be a BaseService, but for now we're keeping it
Expand Down Expand Up @@ -875,9 +893,9 @@ async def _periodically_report_stats(self) -> None:
msg = "%s: running=%s, subscribers=%d" % (peer, peer.is_running, subscribers)
if subscribers:
longest_queue = max(
peer._subscribers, key=operator.methodcaller('qsize'))
peer._subscribers, key=operator.attrgetter('queue_size'))
msg += " longest_subscriber_queue=%s(%d)" % (
longest_queue._name, longest_queue.qsize()) # type: ignore
longest_queue.__class__.__name__, longest_queue.queue_size)
self.logger.debug(msg)
self.logger.debug("== End peer details == ")
try:
Expand Down
11 changes: 9 additions & 2 deletions p2p/shard_syncer.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@
from p2p.service import BaseService
from p2p.peer import (
PeerPool,
PeerPoolSubscriber,
PeerSubscriber,
)

from p2p.sharding_peer import (
Expand All @@ -68,7 +68,7 @@
COLLATION_PERIOD = 1


class ShardSyncer(BaseService, PeerPoolSubscriber):
class ShardSyncer(BaseService, PeerSubscriber):

def __init__(self, shard: Shard, peer_pool: PeerPool, token: CancelToken=None) -> None:
super().__init__(token)
Expand All @@ -80,6 +80,13 @@ def __init__(self, shard: Shard, peer_pool: PeerPool, token: CancelToken=None) -

self.start_time = time.time()

@property
def msg_queue_maxsize(self) -> int:
# This is a rather arbitrary value, but when the sync is operating normally we never see
# the msg queue grow past a few hundred items, so this should be a reasonable limit for
# now.
return 2000

async def _cleanup(self) -> None:
pass

Expand Down
11 changes: 9 additions & 2 deletions p2p/state.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@
from p2p.chain import PeerRequestHandler
from p2p.cancel_token import CancelToken
from p2p.exceptions import OperationCancelled
from p2p.peer import ETHPeer, PeerPool, PeerPoolSubscriber
from p2p.peer import ETHPeer, PeerPool, PeerSubscriber
from p2p.service import BaseService
from p2p.utils import get_process_pool_executor

Expand All @@ -51,7 +51,7 @@
from trinity.db.chain import AsyncChainDB # noqa: F401


class StateDownloader(BaseService, PeerPoolSubscriber):
class StateDownloader(BaseService, PeerSubscriber):
_pending_nodes: Dict[Any, float] = {}
_total_processed_nodes = 0
_report_interval = 10 # Number of seconds between progress reports.
Expand All @@ -74,6 +74,13 @@ def __init__(self,
self._peers_with_pending_requests: Dict[ETHPeer, float] = {}
self._executor = get_process_pool_executor()

@property
def msg_queue_maxsize(self) -> int:
# This is a rather arbitrary value, but when the sync is operating normally we never see
# the msg queue grow past a few hundred items, so this should be a reasonable limit for
# now.
return 2000

async def _get_idle_peers(self) -> List[ETHPeer]:
# FIXME: Should probably use get_peers() and pass the TD of our head? It's not really
# necessary because peers that are behind us may very well have the trie nodes we want.
Expand Down
11 changes: 10 additions & 1 deletion tests/p2p/peer_helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
from typing import List

from eth_hash.auto import keccak
from eth.utils.logging import TraceLogger

from eth.chains.mainnet import MAINNET_GENESIS_HEADER
from eth.db.backends.memory import MemoryDB
Expand All @@ -12,7 +13,7 @@
from p2p import ecies
from p2p import kademlia
from p2p.cancel_token import CancelToken
from p2p.peer import BasePeer, LESPeer, PeerPool
from p2p.peer import BasePeer, LESPeer, PeerPool, PeerSubscriber
from p2p.server import decode_authentication

from integration_test_helpers import FakeAsyncHeaderDB
Expand Down Expand Up @@ -160,3 +161,11 @@ def __init__(self, peers: List[BasePeer]) -> None:

async def _run(self) -> None:
raise NotImplementedError("This is a mock PeerPool implementation, you must not _run() it")


class SamplePeerSubscriber(PeerSubscriber):
logger = TraceLogger("")

@property
def msg_queue_maxsize(self) -> int:
return 100
15 changes: 8 additions & 7 deletions tests/p2p/test_sharding.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
get_directly_linked_peers,
get_directly_linked_peers_without_handshake,
MockPeerPoolWithConnectedPeers,
SamplePeerSubscriber,
)

from cytoolz import (
Expand Down Expand Up @@ -203,7 +204,7 @@ async def test_new_collations_notification(request, event_loop):
# setup a-b-c topology
peer_a_b, peer_b_a = await get_directly_linked_sharding_peers(request, event_loop)
peer_b_c, peer_c_b = await get_directly_linked_sharding_peers(request, event_loop)
peer_c_b_subscriber = asyncio.Queue()
peer_c_b_subscriber = SamplePeerSubscriber()
peer_c_b.add_subscriber(peer_c_b_subscriber)
peer_pool_b = MockPeerPoolWithConnectedPeers([peer_b_a, peer_b_c])

Expand All @@ -223,7 +224,7 @@ def finalizer():
c1 = next(collations)
peer_a_b.sub_proto.send_collations(0, [c1])
peer, cmd, msg = await asyncio.wait_for(
peer_c_b_subscriber.get(),
peer_c_b_subscriber.msg_queue.get(),
timeout=1,
)
assert peer == peer_c_b
Expand All @@ -234,7 +235,7 @@ def finalizer():
c2 = next(collations)
peer_a_b.sub_proto.send_collations(0, [c1, c2])
peer, cmd, msg = await asyncio.wait_for(
peer_c_b_subscriber.get(),
peer_c_b_subscriber.msg_queue.get(),
timeout=1,
)
assert peer == peer_c_b
Expand All @@ -246,7 +247,7 @@ def finalizer():
async def test_syncer_requests_new_collations(request, event_loop):
# setup a-b topology
peer_a_b, peer_b_a = await get_directly_linked_sharding_peers(request, event_loop)
peer_a_b_subscriber = asyncio.Queue()
peer_a_b_subscriber = SamplePeerSubscriber()
peer_a_b.add_subscriber(peer_a_b_subscriber)
peer_pool_b = MockPeerPoolWithConnectedPeers([peer_b_a])

Expand All @@ -266,7 +267,7 @@ def finalizer():
hashes_and_periods = ((b"\xaa" * 32, 0),)
peer_a_b.sub_proto.send_new_collation_hashes(hashes_and_periods)
peer, cmd, msg = await asyncio.wait_for(
peer_a_b_subscriber.get(),
peer_a_b_subscriber.msg_queue.get(),
timeout=1,
)
assert peer == peer_a_b
Expand All @@ -278,7 +279,7 @@ def finalizer():
async def test_syncer_proposing(request, event_loop):
# setup a-b topology
peer_a_b, peer_b_a = await get_directly_linked_sharding_peers(request, event_loop)
peer_a_b_subscriber = asyncio.Queue()
peer_a_b_subscriber = SamplePeerSubscriber()
peer_a_b.add_subscriber(peer_a_b_subscriber)
peer_pool_b = MockPeerPoolWithConnectedPeers([peer_b_a])

Expand All @@ -297,7 +298,7 @@ def finalizer():
# propose at b and check that it announces its proposal
await syncer.propose()
peer, cmd, msg = await asyncio.wait_for(
peer_a_b_subscriber.get(),
peer_a_b_subscriber.msg_queue.get(),
timeout=1,
)
assert peer == peer_a_b
Expand Down