Skip to content
This repository has been archived by the owner on Jul 1, 2021. It is now read-only.

Commit

Permalink
A few improvements to how Connection behaviors are handled
Browse files Browse the repository at this point in the history
These will hopefully make it less likely for us to introduce new code
that leaks PeerConnectionLost exceptions

- Behaviors are now applied/monitored by Connection, ensuring they're
  applied before a Peer starts and only removed after the Peer terminates

- Do not use cached_properties for Peer.head_info & co as if they are
  called while the Peer is being terminated (e.g. as the connection
  tracker does) they can raise a PeerConnectionLost/UnknownAPI

- ChainInfoAPI attributes no longer raise PeerConnectionLost

- Peer will not attempt to send a Disconnect if it's crashed before
  the self._p2_api attribute has been set
  • Loading branch information
gsalgado committed Jul 16, 2020
1 parent a2a9911 commit 65bf6ac
Show file tree
Hide file tree
Showing 22 changed files with 224 additions and 184 deletions.
8 changes: 8 additions & 0 deletions p2p/abc.py
Expand Up @@ -594,6 +594,13 @@ class BehaviorAPI(ABC):
def should_apply_to(self, connection: 'ConnectionAPI') -> bool:
...

@abstractmethod
def post_apply(self) -> None:
"""
Called after all behaviors have been applied to the Connection.
"""
...

@abstractmethod
@contextlib.asynccontextmanager
def apply(self, connection: 'ConnectionAPI') -> AsyncIterator[asyncio.Future[None]]:
Expand Down Expand Up @@ -624,6 +631,7 @@ class ConnectionAPI(ServiceAPI):
#
# Primary properties of the connection
#
behaviors_applied: asyncio.Event
is_dial_out: bool

@property
Expand Down
3 changes: 3 additions & 0 deletions p2p/behaviors.py
Expand Up @@ -37,6 +37,9 @@ def should_apply_to(self, connection: 'ConnectionAPI') -> bool:
# mypy bug: https://github.com/python/mypy/issues/708
return self.qualifier(connection, self.logic) # type: ignore

def post_apply(self) -> None:
self.logic.post_apply()

@contextlib.asynccontextmanager
async def apply(self, connection: ConnectionAPI) -> AsyncIterator[asyncio.Future[None]]:
if self._applied_to is not None:
Expand Down
31 changes: 31 additions & 0 deletions p2p/connection.py
@@ -1,10 +1,12 @@
import asyncio
import collections
import contextlib
import functools
from typing import (
Any,
DefaultDict,
Dict,
List,
Sequence,
Set,
Tuple,
Expand All @@ -20,6 +22,7 @@
from eth_keys import keys

from p2p.abc import (
BehaviorAPI,
CommandAPI,
ConnectionAPI,
HandlerFn,
Expand All @@ -45,6 +48,7 @@
UnknownProtocol,
UnknownProtocolCommand,
)
from p2p.logic import wait_first
from p2p.subscription import Subscription
from p2p.p2p_proto import BaseP2PProtocol, DevP2PReceipt, Disconnect
from p2p.typing import Capabilities
Expand Down Expand Up @@ -91,6 +95,8 @@ def __init__(self,
# before all necessary handlers have been added
self._handlers_ready = asyncio.Event()

self.behaviors_applied = asyncio.Event()

self._logics = {}

def __str__(self) -> str:
Expand All @@ -102,13 +108,36 @@ def __repr__(self) -> str:
def start_protocol_streams(self) -> None:
self._handlers_ready.set()

async def run_behaviors(self, behaviors: Tuple[BehaviorAPI, ...]) -> None:
async with contextlib.AsyncExitStack() as stack:
futures: List[asyncio.Future[None]] = [
asyncio.create_task(self.manager.wait_finished())]
for behavior in behaviors:
if behavior.should_apply_to(self):
behavior_exit = await stack.enter_async_context(behavior.apply(self))
futures.append(behavior_exit)

self.behaviors_applied.set()
try:
for behavior in behaviors:
behavior.post_apply()
await wait_first(futures)
except PeerConnectionLost:
# Any of our behaviors may propagate a PeerConnectionLost, which is to be expected
# as many Connection APIs used by them can raise that. To avoid a DaemonTaskExit
# since we're returning silently, ensure we're cancelled.
self.manager.cancel()

async def run_peer(self, peer: 'BasePeer') -> None:
"""
Run the peer as a child service.
A peer must always run as a child of the connection so that it has an open connection
until it finishes its cleanup.
"""
self.manager.run_daemon_task(self.run_behaviors, peer.get_behaviors())
await self.behaviors_applied.wait()

self.manager.run_daemon_child_service(peer)
await asyncio.wait_for(peer.manager.wait_started(), timeout=PEER_READY_TIMEOUT)
await asyncio.wait_for(peer.ready.wait(), timeout=PEER_READY_TIMEOUT)
Expand Down Expand Up @@ -277,6 +306,8 @@ def remove_logic(self, name: str) -> None:

def has_logic(self, name: str) -> bool:
if self.is_closing:
# This is a safety net, really, as the Peer should never call this if it is no longer
# alive.
raise PeerConnectionLost("Cannot look up subprotocol when connection is closing")
return name in self._logics

Expand Down
8 changes: 4 additions & 4 deletions p2p/kademlia.py
Expand Up @@ -322,22 +322,22 @@ def update(self, node_id: NodeID) -> NodeID:
is_node_in_bucket = node_id in bucket

if not is_node_in_bucket and not is_bucket_full:
self.logger.debug("Adding %s to bucket %d", encode_hex(node_id), bucket_index)
self.logger.debug2("Adding %s to bucket %d", encode_hex(node_id), bucket_index)
self.update_bucket_unchecked(node_id)
eviction_candidate = None
elif is_node_in_bucket:
self.logger.debug("Updating %s in bucket %d", encode_hex(node_id), bucket_index)
self.logger.debug2("Updating %s in bucket %d", encode_hex(node_id), bucket_index)
self.update_bucket_unchecked(node_id)
eviction_candidate = None
elif not is_node_in_bucket and is_bucket_full:
if node_id not in replacement_cache:
self.logger.debug(
self.logger.debug2(
"Adding %s to replacement cache of bucket %d",
encode_hex(node_id),
bucket_index,
)
else:
self.logger.debug(
self.logger.debug2(
"Updating %s in replacement cache of bucket %d",
encode_hex(node_id),
bucket_index,
Expand Down
6 changes: 5 additions & 1 deletion p2p/logic.py
Expand Up @@ -39,6 +39,9 @@ def as_behavior(self, qualifier: QualifierFn = None) -> BehaviorAPI:
qualifier = self.qualifier # type: ignore
return Behavior(qualifier, self)

def post_apply(self) -> None:
pass


class CommandHandler(BaseLogic, Generic[TCommand]):
"""
Expand Down Expand Up @@ -90,7 +93,8 @@ async def wait_first(futures: Sequence[asyncio.Future[None]]) -> None:
await cancel_futures(futures)
raise
else:
await cancel_futures(pending)
if pending:
await cancel_futures(pending)
if len(done) != 1:
raise Exception(
"Invariant: asyncio.wait() returned more than one future even "
Expand Down
97 changes: 56 additions & 41 deletions p2p/peer.py
Expand Up @@ -19,7 +19,7 @@
TYPE_CHECKING,
)

from async_service import Service
from async_service import LifecycleError, Service

from lahja import EndpointAPI

Expand Down Expand Up @@ -55,7 +55,6 @@
DevP2PHandshakeParams,
)
from p2p.logging import loggable
from p2p.logic import wait_first
from p2p.p2p_api import P2PAPI
from p2p.p2p_proto import BaseP2PProtocol, Disconnect
from p2p.tracking.connection import (
Expand Down Expand Up @@ -115,7 +114,7 @@ class BasePeer(Service):
_event_bus: EndpointAPI = None

base_protocol: BaseP2PProtocol
p2p_api: P2PAPI
_p2p_api: P2PAPI

def __init__(self,
connection: ConnectionAPI,
Expand Down Expand Up @@ -166,6 +165,9 @@ def __init__(self,
# been installed to the connection.
self.ready = asyncio.Event()

def _pre_run(self) -> None:
self._p2p_api = self.connection.get_logic('p2p', P2PAPI)

@property
def uptime(self) -> float:
if self._start_time is None:
Expand All @@ -183,7 +185,7 @@ def process_handshake_receipts(self) -> None:
pass

def get_behaviors(self) -> Tuple[BehaviorAPI, ...]:
return ()
return (P2PAPI().as_behavior(),)

@cached_property
def has_event_bus(self) -> bool:
Expand Down Expand Up @@ -252,52 +254,53 @@ def setup_protocol_handlers(self) -> None:
pass

async def _handle_disconnect(self, connection: ConnectionAPI, cmd: Disconnect) -> None:
self.p2p_api.remote_disconnect_reason = cmd.payload
self._p2p_api.remote_disconnect_reason = cmd.payload
# We run as a daemon child of the connection, so cancel the connection instead of
# ourselves to ensure asyncio-service doesn't think we're exiting when the connection is
# still active, as that would cause a DaemonTaskExit.
self.connection.get_manager().cancel()

@property
def is_alive(self) -> bool:
# We need this because when a remote disconnects from us the connection may be closed
# before the Disconnect msg is processed and cancels ourselves.
if not hasattr(self, 'manager'):
return False
return self.manager.is_running and not self.connection.is_closing

async def run(self) -> None:
self._start_time = time.monotonic()
self.connection.add_command_handler(Disconnect, cast(HandlerFn, self._handle_disconnect))
if not self.connection.behaviors_applied.is_set():
raise LifecycleError("Cannot run peer when behaviors haven't been applied")

try:
async with contextlib.AsyncExitStack() as stack:
fut = await stack.enter_async_context(P2PAPI().as_behavior().apply(self.connection))
futures = [fut]
self.p2p_api = self.connection.get_logic('p2p', P2PAPI)

for behavior in self.get_behaviors():
if behavior.should_apply_to(self.connection):
future = await stack.enter_async_context(behavior.apply(self.connection))
futures.append(future)

self.connection.add_msg_handler(self._handle_subscriber_message)

self.setup_protocol_handlers()

# The `boot` process is run in the background to allow the `run` loop
# to continue so that all of the Peer APIs can be used within the
# `boot` task.
self.manager.run_child_service(self.boot_manager)

# Trigger the connection to start feeding messages though the handlers
self.connection.start_protocol_streams()
self.ready.set()

try:
await wait_first(futures)
except asyncio.CancelledError:
raise
except BaseException:
self.logger.exception("Behavior finished before us, cancelling ourselves")
self.manager.cancel()
self._start_time = time.monotonic()
self._pre_run()

self.connection.add_command_handler(
Disconnect, cast(HandlerFn, self._handle_disconnect))

self.connection.add_msg_handler(self._handle_subscriber_message)

self.setup_protocol_handlers()

# The `boot` process is run in the background to allow the `run` loop
# to continue so that all of the Peer APIs can be used within the
# `boot` task.
self.manager.run_child_service(self.boot_manager)

# Trigger the connection to start feeding messages though the handlers
self.connection.start_protocol_streams()
self.ready.set()

await self.manager.wait_finished()
finally:
for callback in self._finished_callbacks:
callback(self)
if hasattr(self, 'p2p_api'):
if (self.p2p_api.local_disconnect_reason is None and
self.p2p_api.remote_disconnect_reason is None):
# We may have crashed before setting self._p2p_api; in that case don't attempt to send
# a disconnect.
if hasattr(self, '_p2p_api'):
if (self.local_disconnect_reason is None and
self.remote_disconnect_reason is None):
self._send_disconnect(DisconnectReason.CLIENT_QUITTING)
# We run as a child service of the connection, but we don't want to leave a connection
# open if somebody cancels just us, so this ensures the connection gets closed as well.
Expand Down Expand Up @@ -332,10 +335,22 @@ def disconnect_nowait(self, reason: DisconnectReason) -> None:

def _send_disconnect(self, reason: DisconnectReason) -> None:
try:
self.p2p_api.disconnect(reason)
self._p2p_api.disconnect(reason)
except PeerConnectionLost:
self.logger.debug("Tried to disconnect from %s, but already disconnected", self)

@property
def safe_client_version_string(self) -> str:
return self._p2p_api.safe_client_version_string

@property
def local_disconnect_reason(self) -> DisconnectReason:
return self._p2p_api.local_disconnect_reason

@property
def remote_disconnect_reason(self) -> DisconnectReason:
return self._p2p_api.remote_disconnect_reason


class PeerMessage(NamedTuple):
peer: BasePeer
Expand Down
6 changes: 3 additions & 3 deletions p2p/peer_pool.py
Expand Up @@ -477,8 +477,8 @@ def _peer_finished(self, peer: BasePeer) -> None:
self.logger.debug(
"Removing %s from pool: local_reason=%s remote_reason=%s",
peer,
peer.p2p_api.local_disconnect_reason,
peer.p2p_api.remote_disconnect_reason,
peer.local_disconnect_reason,
peer.remote_disconnect_reason,
)
self.connected_nodes.pop(peer.session)
else:
Expand Down Expand Up @@ -537,7 +537,7 @@ async def _periodically_report_stats(self) -> None:
)
self.logger.debug(
"client_version_string='%s'",
peer.p2p_api.safe_client_version_string,
peer.safe_client_version_string,
)
try:
for line in peer.get_extra_stats():
Expand Down
16 changes: 9 additions & 7 deletions tests/p2p/test_peer.py
Expand Up @@ -22,7 +22,7 @@ async def _handle_disconnect(conn, cmd):
bob.connection.add_command_handler(Disconnect, _handle_disconnect)
await alice.manager.stop()
await asyncio.wait_for(got_disconnect.wait(), timeout=1)
assert bob.p2p_api.remote_disconnect_reason == DisconnectReason.CLIENT_QUITTING
assert bob.remote_disconnect_reason == DisconnectReason.CLIENT_QUITTING


@pytest.mark.asyncio
Expand All @@ -40,10 +40,10 @@ async def test_cancels_on_received_disconnect():
# cancel itself even if alice accidentally leaves her connection open. If we used
# alice.cancel() to send the Disconnect msg, alice would also close its connection,
# causing bob to detect it, close its own and cause the peer to be cancelled.
alice.p2p_api.disconnect(DisconnectReason.CLIENT_QUITTING)
alice._p2p_api.disconnect(DisconnectReason.CLIENT_QUITTING)
await asyncio.wait_for(bob.connection.manager.wait_finished(), timeout=1)
assert bob.connection.is_closing
assert bob.p2p_api.remote_disconnect_reason == DisconnectReason.CLIENT_QUITTING
assert bob.remote_disconnect_reason == DisconnectReason.CLIENT_QUITTING


class BehaviorCrash(Exception):
Expand All @@ -60,12 +60,14 @@ async def apply(self, connection):


@pytest.mark.asyncio
async def test_stops_if_behavior_crashes(monkeypatch):
async def test_propagates_behavior_crashes(monkeypatch):

def init(self):
self.add_child_behavior(CrashingLogic().as_behavior(always))

monkeypatch.setattr(ParagonAPI, '__init__', init)
async with ParagonPeerPairFactory() as (alice, _):
await asyncio.wait_for(alice.ready.wait(), timeout=0.5)
assert alice.manager.is_cancelled
with pytest.raises(BehaviorCrash):
async with ParagonPeerPairFactory() as (alice, _):
await asyncio.wait_for(alice.manager.wait_finished(), timeout=0.5)

assert alice.manager.is_cancelled

0 comments on commit 65bf6ac

Please sign in to comment.