Skip to content
This repository has been archived by the owner on Jul 1, 2021. It is now read-only.

Add partial support for admin_peers JSON-RPC API #1491

Merged
merged 1 commit into from Mar 23, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
1 change: 1 addition & 0 deletions newsfragments/1491.feature.rst
@@ -0,0 +1 @@
Implement ``admin_peers`` JSON-RPC API
52 changes: 52 additions & 0 deletions tests/core/json-rpc/test_ipc.py
Expand Up @@ -36,10 +36,15 @@
SyncProgress
)
from trinity.tools.event_bus import mock_request_response
from trinity.tools.factories import ETHPeerPairFactory

from trinity._utils.version import construct_trinity_client_identifier


from tests.core.integration_test_helpers import run_peer_pool_event_server
from tests.core.peer_helpers import MockPeerPoolWithConnectedPeers


def wait_for(path):
for _ in range(100):
if os.path.exists(path):
Expand Down Expand Up @@ -645,6 +650,53 @@ async def test_admin_addPeer_fires_message(
assert event.remote.uri() == enode


@pytest.mark.asyncio
async def test_admin_peers(
jsonrpc_ipc_pipe_path,
event_loop,
event_bus,
ipc_server):

async with ETHPeerPairFactory() as (alice, bob):
peer_pool = MockPeerPoolWithConnectedPeers([alice, bob], event_bus=event_bus)

async with run_peer_pool_event_server(event_bus, peer_pool):

request = build_request('admin_peers')

result = await get_ipc_response(
jsonrpc_ipc_pipe_path,
request,
event_loop,
event_bus
)

peers = result['result']
json_bob = peers[0]
json_alice = peers[1]

def to_remote_address(session):
return f"{session.remote.address.ip}:{session.remote.address.tcp_port}"

assert json_bob['caps'] == ['eth/63', 'eth/64']
assert json_bob['enode'] == alice.connection.session.remote.uri()
assert json_bob['id'] == str(alice.connection.session.id)
assert json_bob['name'] == 'bob'
bob_network = json_bob['network']
assert not bob_network['inbound']
assert bob_network['localAddress'] == '0.0.0.0:30303'
assert bob_network['remoteAddress'] == to_remote_address(alice.connection.session)

assert json_alice['caps'] == ['eth/63', 'eth/64']
assert json_alice['enode'] == bob.connection.session.remote.uri()
assert json_alice['id'] == str(bob.connection.session.id)
assert json_alice['name'] == 'alice'
alice_network = json_alice['network']
assert alice_network['inbound']
assert alice_network['localAddress'] == '0.0.0.0:30303'
assert alice_network['remoteAddress'] == to_remote_address(bob.connection.session)


@pytest.fixture
def ipc_request(jsonrpc_ipc_pipe_path, event_loop, event_bus, ipc_server):
async def make_request(*args):
Expand Down
15 changes: 7 additions & 8 deletions tests/core/p2p-proto/test_base_proxy_peer_pool.py
Expand Up @@ -8,14 +8,13 @@
from trinity.constants import TO_NETWORKING_BROADCAST_CONFIG
from trinity.protocol.common.events import (
GetConnectedPeersRequest,
GetConnectedPeersResponse,
PeerJoinedEvent,
PeerLeftEvent,
)
from trinity.tools.event_bus import mock_request_response

from trinity.protocol.eth.peer import ETHProxyPeerPool

from trinity.tools.factories.events import GetConnectedPeersResponseFactory

TEST_NODES = tuple(SessionFactory.create_batch(4))

Expand All @@ -28,8 +27,8 @@ async def test_can_instantiate_proxy_pool(event_bus):
@pytest.mark.parametrize(
"response, expected_count",
(
(GetConnectedPeersResponse(tuple()), 0),
(GetConnectedPeersResponse(TEST_NODES), 4),
(GetConnectedPeersResponseFactory.from_sessions(tuple()), 0),
(GetConnectedPeersResponseFactory.from_sessions(TEST_NODES), 4),
),
)
@pytest.mark.asyncio
Expand All @@ -47,8 +46,8 @@ async def test_fetch_initial_peers(event_bus, response, expected_count):
@pytest.mark.parametrize(
"response, expected_count",
(
(GetConnectedPeersResponse(tuple()), 0),
(GetConnectedPeersResponse(TEST_NODES), 4),
(GetConnectedPeersResponseFactory.from_sessions(tuple()), 0),
(GetConnectedPeersResponseFactory.from_sessions(TEST_NODES), 4),
),
)
@pytest.mark.asyncio
Expand All @@ -67,7 +66,7 @@ async def test_adds_new_peers(event_bus):

do_mock = mock_request_response(
GetConnectedPeersRequest,
GetConnectedPeersResponse((TEST_NODES[0],)),
GetConnectedPeersResponseFactory.from_sessions((TEST_NODES[0],)),
event_bus,
)
async with do_mock:
Expand All @@ -87,7 +86,7 @@ async def test_adds_new_peers(event_bus):
async def test_removes_peers(event_bus):
do_mock = mock_request_response(
GetConnectedPeersRequest,
GetConnectedPeersResponse(TEST_NODES[:2]),
GetConnectedPeersResponseFactory.from_sessions(TEST_NODES[:2]),
event_bus,
)

Expand Down
6 changes: 3 additions & 3 deletions tests/core/tx-pool/test_tx_pool.py
Expand Up @@ -20,7 +20,6 @@
)
from trinity.protocol.common.events import (
GetConnectedPeersRequest,
GetConnectedPeersResponse,
)
from trinity.protocol.eth.commands import (
Transactions
Expand All @@ -30,6 +29,7 @@
SendTransactionsEvent,
)
from trinity.tools.event_bus import mock_request_response
from trinity.tools.factories.events import GetConnectedPeersResponseFactory

from trinity.protocol.eth.peer import (
ETHProxyPeerPool,
Expand Down Expand Up @@ -71,7 +71,7 @@ async def test_tx_propagation(event_bus,
async with AsyncExitStack() as stack:
await stack.enter_async_context(mock_request_response(
GetConnectedPeersRequest,
GetConnectedPeersResponse(initial_two_peers),
GetConnectedPeersResponseFactory.from_sessions(initial_two_peers),
event_bus,
))

Expand Down Expand Up @@ -155,7 +155,7 @@ async def test_does_not_propagate_invalid_tx(event_bus,
async with AsyncExitStack() as stack:
await stack.enter_async_context(mock_request_response(
GetConnectedPeersRequest,
GetConnectedPeersResponse(initial_two_peers),
GetConnectedPeersResponseFactory.from_sessions(initial_two_peers),
event_bus,
))

Expand Down
23 changes: 22 additions & 1 deletion trinity/protocol/common/events.py
Expand Up @@ -5,6 +5,8 @@
Any,
Tuple,
Type,
NamedTuple,
Dict,
)

from lahja import (
Expand All @@ -14,6 +16,7 @@

from p2p.abc import CommandAPI, NodeAPI, SessionAPI
from p2p.disconnect import DisconnectReason
from p2p.peer import BasePeer
from p2p.typing import Capabilities


Expand Down Expand Up @@ -69,10 +72,28 @@ class PeerLeftEvent(BaseEvent):
session: SessionAPI


class PeerInfo(NamedTuple):
session: SessionAPI
capabilities: Capabilities
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Lol, it looks like you implemented pretty much the exact thing I typed up 👍

client_version_string: str
inbound: bool


@dataclass
class GetConnectedPeersResponse(BaseEvent):

sessions: Tuple[SessionAPI, ...]
peers: Tuple[PeerInfo, ...]

@staticmethod
def from_connected_nodes(peers: Dict[SessionAPI, BasePeer]) -> 'GetConnectedPeersResponse':
return GetConnectedPeersResponse(tuple(
PeerInfo(
session=session,
capabilities=peer.connection.remote_capabilities,
client_version_string=peer.connection.safe_client_version_string,
inbound=peer.inbound
) for session, peer in peers.items()
))


class GetConnectedPeersRequest(BaseRequestResponseEvent[GetConnectedPeersResponse]):
Expand Down
10 changes: 5 additions & 5 deletions trinity/protocol/common/peer_pool_event_bus.py
Expand Up @@ -45,7 +45,7 @@
PeerCountResponse,
PeerJoinedEvent,
PeerLeftEvent,
ProtocolCapabilitiesResponse
ProtocolCapabilitiesResponse,
)
from .peer import BaseProxyPeer

Expand Down Expand Up @@ -157,7 +157,7 @@ async def handle_peer_count_requests(self) -> None:
async def handle_get_connected_peers_requests(self) -> None:
async for req in self.event_bus.stream(GetConnectedPeersRequest):
await self.event_bus.broadcast(
GetConnectedPeersResponse(tuple(self.peer_pool.connected_nodes.keys())),
GetConnectedPeersResponse.from_connected_nodes(self.peer_pool.connected_nodes),
req.broadcast_config()
)

Expand Down Expand Up @@ -304,9 +304,9 @@ async def fetch_initial_peers(self) -> Tuple[TProxyPeer, ...]:
)

return tuple([
await self.ensure_proxy_peer(session)
for session
in response.sessions
await self.ensure_proxy_peer(peer_info.session)
for peer_info
in response.peers
])

async def get_peers(self) -> Tuple[TProxyPeer, ...]:
Expand Down
32 changes: 30 additions & 2 deletions trinity/rpc/modules/admin.py
@@ -1,8 +1,10 @@
from typing import Tuple, Iterable, Dict

from typing import Tuple, Iterable, Dict, Sequence

from eth.constants import GENESIS_BLOCK_NUMBER
from eth_typing import BlockNumber
from eth_utils import encode_hex, to_dict

from lahja import EndpointAPI

from p2p.kademlia import Node
Expand All @@ -14,11 +16,14 @@
from trinity.constants import TO_NETWORKING_BROADCAST_CONFIG
from trinity.protocol.common.events import (
ConnectToNodeCommand,
GetProtocolCapabilitiesRequest
GetConnectedPeersRequest,
GetProtocolCapabilitiesRequest,
PeerInfo,
)
from trinity.rpc.modules import Eth1ChainRPCModule
from trinity.rpc.typing import RpcProtocolResponse, RpcNodeInfoResponse
from trinity.server import BOUND_IP
from trinity.rpc.typing import RpcPeerResponse
from trinity._utils.version import construct_trinity_client_identifier


Expand Down Expand Up @@ -91,3 +96,26 @@ async def _generate_protocol_info(
}
for protocol, version in protocols
}

def _format_peer(self, peer_info: PeerInfo) -> RpcPeerResponse:
session = peer_info.session
return {
'enode': session.remote.uri(),
'id': str(session.id),
'name': peer_info.client_version_string,
'caps': [f"{protocol}/{version}" for protocol, version in peer_info.capabilities],
'network': {
'localAddress': f"{BOUND_IP}:{self.trinity_config.port}",
'remoteAddress': f"{session.remote.address.ip}:{session.remote.address.tcp_port}",
'inbound': peer_info.inbound
}
}

async def peers(self) -> Sequence[RpcPeerResponse]:

response = await self.event_bus.request(GetConnectedPeersRequest())
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@pipermerriam So, our GetConnectedPeersResponse contains SessionAPIs now but all the interesting stuff that geth prints out is sitting on the peer. The obvious answer would be to change to not return a list of SessionAPI but something that includes more information (a subset of the actual peer) but I know you've been really deep into the p2p stack the last months and formed an idea of how these things could work in the future so I'm reaching out to here your thoughts before I just hack myself through until I have the missing data :)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it would be very reasonable to have a new object returned that aggregates information about the session like:

class PeerInfo(NamedTuple):
    session: SessionAPI
    capabilities: Capabilities
    ...


return tuple(
self._format_peer(peer_info)
for peer_info in response.peers
)
14 changes: 14 additions & 0 deletions trinity/rpc/typing.py
Expand Up @@ -30,6 +30,20 @@ class RpcNodeInfoResponse(TypedDict):
protocols: Dict[str, RpcProtocolResponse]


class RpcPeerNetworkResponse(TypedDict):
localAddress: str
remoteAddress: str
inbound: bool


class RpcPeerResponse(TypedDict):
enode: str
id: str
name: str
caps: Sequence[str]
network: RpcPeerNetworkResponse


RpcTransactionResponse = TypedDict('RpcTransactionResponse', {
'hash': HexStr,
'nonce': str,
Expand Down
32 changes: 32 additions & 0 deletions trinity/tools/factories/events.py
@@ -0,0 +1,32 @@
from typing import Sequence, Any

from p2p.abc import SessionAPI

try:
import factory
except ImportError:
raise ImportError(
"The p2p.tools.factories module requires the `factory_boy` and `faker` libraries."
)


from trinity.protocol.common.events import GetConnectedPeersResponse, PeerInfo


class GetConnectedPeersResponseFactory(factory.Factory):
class Meta:
model = GetConnectedPeersResponse

@classmethod
def from_sessions(cls,
sessions: Sequence[SessionAPI],
*args: Any,
**kwargs: Any) -> GetConnectedPeersResponse:
return GetConnectedPeersResponse(tuple(
PeerInfo(
session=session,
capabilities=(),
client_version_string='unknown',
inbound=False
) for session in sessions
))