Skip to content
This repository has been archived by the owner on Jul 1, 2021. It is now read-only.

Commit

Permalink
Add partial support for admin_peers JSON-RPC API
Browse files Browse the repository at this point in the history
  • Loading branch information
cburgdorf committed Mar 19, 2020
1 parent 6c242fb commit 90e6397
Show file tree
Hide file tree
Showing 9 changed files with 168 additions and 19 deletions.
1 change: 1 addition & 0 deletions newsfragments/1491.feature.rst
@@ -0,0 +1 @@
Implement ``admin_peers`` JSON-RPC API
53 changes: 53 additions & 0 deletions tests/core/json-rpc/test_ipc.py
Expand Up @@ -19,6 +19,7 @@
to_int,
)


from trinity.nodes.events import (
NetworkIdRequest,
NetworkIdResponse,
Expand All @@ -36,10 +37,15 @@
SyncProgress
)
from trinity.tools.event_bus import mock_request_response
from trinity.tools.factories import ETHPeerPairFactory

from trinity._utils.version import construct_trinity_client_identifier


from tests.core.integration_test_helpers import run_peer_pool_event_server
from tests.core.peer_helpers import MockPeerPoolWithConnectedPeers


def wait_for(path):
for _ in range(100):
if os.path.exists(path):
Expand Down Expand Up @@ -645,6 +651,53 @@ async def test_admin_addPeer_fires_message(
assert event.remote.uri() == enode


@pytest.mark.asyncio
async def test_admin_peers(
jsonrpc_ipc_pipe_path,
event_loop,
event_bus,
ipc_server):

async with ETHPeerPairFactory() as (alice, bob):
peer_pool = MockPeerPoolWithConnectedPeers([alice, bob], event_bus=event_bus)

async with run_peer_pool_event_server(event_bus, peer_pool):

request = build_request('admin_peers')

result = await get_ipc_response(
jsonrpc_ipc_pipe_path,
request,
event_loop,
event_bus
)

peers = result['result']
json_bob = peers[0]
json_alice = peers[1]

def to_remote_address(session):
return f"{session.remote.address.ip}:{session.remote.address.tcp_port}"

assert json_bob['caps'] == ['eth/63', 'eth/64']
assert json_bob['enode'] == alice.connection.session.remote.uri()
assert json_bob['id'] == str(alice.connection.session.id)
assert json_bob['name'] == 'bob'
bob_network = json_bob['network']
assert not bob_network['inbound']
assert bob_network['localAddress'] == '0.0.0.0:30303'
assert bob_network['remoteAddress'] == to_remote_address(alice.connection.session)

assert json_alice['caps'] == ['eth/63', 'eth/64']
assert json_alice['enode'] == bob.connection.session.remote.uri()
assert json_alice['id'] == str(bob.connection.session.id)
assert json_alice['name'] == 'alice'
alice_network = json_alice['network']
assert alice_network['inbound']
assert alice_network['localAddress'] == '0.0.0.0:30303'
assert alice_network['remoteAddress'] == to_remote_address(bob.connection.session)


@pytest.fixture
def ipc_request(jsonrpc_ipc_pipe_path, event_loop, event_bus, ipc_server):
async def make_request(*args):
Expand Down
16 changes: 8 additions & 8 deletions tests/core/p2p-proto/test_base_proxy_peer_pool.py
@@ -1,4 +1,5 @@
import asyncio

import pytest

from p2p.tools.factories import SessionFactory
Expand All @@ -8,14 +9,13 @@
from trinity.constants import TO_NETWORKING_BROADCAST_CONFIG
from trinity.protocol.common.events import (
GetConnectedPeersRequest,
GetConnectedPeersResponse,
PeerJoinedEvent,
PeerLeftEvent,
)
from trinity.tools.event_bus import mock_request_response

from trinity.protocol.eth.peer import ETHProxyPeerPool

from trinity.tools.factories.events import GetConnectedPeersResponseFactory

TEST_NODES = tuple(SessionFactory.create_batch(4))

Expand All @@ -28,8 +28,8 @@ async def test_can_instantiate_proxy_pool(event_bus):
@pytest.mark.parametrize(
"response, expected_count",
(
(GetConnectedPeersResponse(tuple()), 0),
(GetConnectedPeersResponse(TEST_NODES), 4),
(GetConnectedPeersResponseFactory.from_sessions(tuple()), 0),
(GetConnectedPeersResponseFactory.from_sessions(TEST_NODES), 4),
),
)
@pytest.mark.asyncio
Expand All @@ -47,8 +47,8 @@ async def test_fetch_initial_peers(event_bus, response, expected_count):
@pytest.mark.parametrize(
"response, expected_count",
(
(GetConnectedPeersResponse(tuple()), 0),
(GetConnectedPeersResponse(TEST_NODES), 4),
(GetConnectedPeersResponseFactory.from_sessions(tuple()), 0),
(GetConnectedPeersResponseFactory.from_sessions(TEST_NODES), 4),
),
)
@pytest.mark.asyncio
Expand All @@ -67,7 +67,7 @@ async def test_adds_new_peers(event_bus):

do_mock = mock_request_response(
GetConnectedPeersRequest,
GetConnectedPeersResponse((TEST_NODES[0],)),
GetConnectedPeersResponseFactory.from_sessions((TEST_NODES[0],)),
event_bus,
)
async with do_mock:
Expand All @@ -87,7 +87,7 @@ async def test_adds_new_peers(event_bus):
async def test_removes_peers(event_bus):
do_mock = mock_request_response(
GetConnectedPeersRequest,
GetConnectedPeersResponse(TEST_NODES[:2]),
GetConnectedPeersResponseFactory.from_sessions(TEST_NODES[:2]),
event_bus,
)

Expand Down
6 changes: 3 additions & 3 deletions tests/core/tx-pool/test_tx_pool.py
Expand Up @@ -20,7 +20,6 @@
)
from trinity.protocol.common.events import (
GetConnectedPeersRequest,
GetConnectedPeersResponse,
)
from trinity.protocol.eth.commands import (
Transactions
Expand All @@ -30,6 +29,7 @@
SendTransactionsEvent,
)
from trinity.tools.event_bus import mock_request_response
from trinity.tools.factories.events import GetConnectedPeersResponseFactory

from trinity.protocol.eth.peer import (
ETHProxyPeerPool,
Expand Down Expand Up @@ -71,7 +71,7 @@ async def test_tx_propagation(event_bus,
async with AsyncExitStack() as stack:
await stack.enter_async_context(mock_request_response(
GetConnectedPeersRequest,
GetConnectedPeersResponse(initial_two_peers),
GetConnectedPeersResponseFactory.from_sessions(initial_two_peers),
event_bus,
))

Expand Down Expand Up @@ -155,7 +155,7 @@ async def test_does_not_propagate_invalid_tx(event_bus,
async with AsyncExitStack() as stack:
await stack.enter_async_context(mock_request_response(
GetConnectedPeersRequest,
GetConnectedPeersResponse(initial_two_peers),
GetConnectedPeersResponseFactory.from_sessions(initial_two_peers),
event_bus,
))

Expand Down
23 changes: 22 additions & 1 deletion trinity/protocol/common/events.py
Expand Up @@ -5,6 +5,8 @@
Any,
Tuple,
Type,
NamedTuple,
Dict,
)

from lahja import (
Expand All @@ -14,6 +16,7 @@

from p2p.abc import CommandAPI, NodeAPI, SessionAPI
from p2p.disconnect import DisconnectReason
from p2p.peer import BasePeer
from p2p.typing import Capabilities


Expand Down Expand Up @@ -69,10 +72,28 @@ class PeerLeftEvent(BaseEvent):
session: SessionAPI


class PeerInfo(NamedTuple):
session: SessionAPI
capabilities: Capabilities
client_version_string: str
inbound: bool


@dataclass
class GetConnectedPeersResponse(BaseEvent):

sessions: Tuple[SessionAPI, ...]
peers: Tuple[PeerInfo, ...]

@staticmethod
def from_connected_nodes(peers: Dict[SessionAPI, BasePeer]) -> 'GetConnectedPeersResponse':
return GetConnectedPeersResponse(tuple(
PeerInfo(
session=session,
capabilities=peer.connection.remote_capabilities,
client_version_string=peer.connection.safe_client_version_string,
inbound=peer.inbound
) for session, peer in peers.items()
))


class GetConnectedPeersRequest(BaseRequestResponseEvent[GetConnectedPeersResponse]):
Expand Down
10 changes: 5 additions & 5 deletions trinity/protocol/common/peer_pool_event_bus.py
Expand Up @@ -45,7 +45,7 @@
PeerCountResponse,
PeerJoinedEvent,
PeerLeftEvent,
ProtocolCapabilitiesResponse
ProtocolCapabilitiesResponse,
)
from .peer import BaseProxyPeer

Expand Down Expand Up @@ -157,7 +157,7 @@ async def handle_peer_count_requests(self) -> None:
async def handle_get_connected_peers_requests(self) -> None:
async for req in self.event_bus.stream(GetConnectedPeersRequest):
await self.event_bus.broadcast(
GetConnectedPeersResponse(tuple(self.peer_pool.connected_nodes.keys())),
GetConnectedPeersResponse.from_connected_nodes(self.peer_pool.connected_nodes),
req.broadcast_config()
)

Expand Down Expand Up @@ -304,9 +304,9 @@ async def fetch_initial_peers(self) -> Tuple[TProxyPeer, ...]:
)

return tuple([
await self.ensure_proxy_peer(session)
for session
in response.sessions
await self.ensure_proxy_peer(peer_info.session)
for peer_info
in response.peers
])

async def get_peers(self) -> Tuple[TProxyPeer, ...]:
Expand Down
32 changes: 30 additions & 2 deletions trinity/rpc/modules/admin.py
@@ -1,8 +1,10 @@
from typing import Tuple, Iterable, Dict

from typing import Tuple, Iterable, Dict, Sequence

from eth.constants import GENESIS_BLOCK_NUMBER
from eth_typing import BlockNumber
from eth_utils import encode_hex, to_dict

from lahja import EndpointAPI

from p2p.kademlia import Node
Expand All @@ -14,11 +16,14 @@
from trinity.constants import TO_NETWORKING_BROADCAST_CONFIG
from trinity.protocol.common.events import (
ConnectToNodeCommand,
GetProtocolCapabilitiesRequest
GetConnectedPeersRequest,
GetProtocolCapabilitiesRequest,
PeerInfo,
)
from trinity.rpc.modules import Eth1ChainRPCModule
from trinity.rpc.typing import RpcProtocolResponse, RpcNodeInfoResponse
from trinity.server import BOUND_IP
from trinity.rpc.typing import RpcPeerResponse
from trinity._utils.version import construct_trinity_client_identifier


Expand Down Expand Up @@ -91,3 +96,26 @@ async def _generate_protocol_info(
}
for protocol, version in protocols
}

def _format_peer(self, peer_info: PeerInfo) -> RpcPeerResponse:
session = peer_info.session
return {
'enode': session.remote.uri(),
'id': str(session.id),
'name': peer_info.client_version_string,
'caps': [f"{protocol}/{version}" for protocol, version in peer_info.capabilities],
'network': {
'localAddress': f"{BOUND_IP}:{self.trinity_config.port}",
'remoteAddress': f"{session.remote.address.ip}:{session.remote.address.tcp_port}",
'inbound': peer_info.inbound
}
}

async def peers(self) -> Sequence[RpcPeerResponse]:

response = await self.event_bus.request(GetConnectedPeersRequest())

return tuple(
self._format_peer(peer_info)
for peer_info in response.peers
)
14 changes: 14 additions & 0 deletions trinity/rpc/typing.py
Expand Up @@ -30,6 +30,20 @@ class RpcNodeInfoResponse(TypedDict):
protocols: Dict[str, RpcProtocolResponse]


class RpcPeerNetworkResponse(TypedDict):
localAddress: str
remoteAddress: str
inbound: bool


class RpcPeerResponse(TypedDict):
enode: str
id: str
name: str
caps: Sequence[str]
network: RpcPeerNetworkResponse


RpcTransactionResponse = TypedDict('RpcTransactionResponse', {
'hash': HexStr,
'nonce': str,
Expand Down
32 changes: 32 additions & 0 deletions trinity/tools/factories/events.py
@@ -0,0 +1,32 @@
from typing import Sequence, Any

from p2p.abc import SessionAPI

try:
import factory
except ImportError:
raise ImportError(
"The p2p.tools.factories module requires the `factory_boy` and `faker` libraries."
)


from trinity.protocol.common.events import GetConnectedPeersResponse, PeerInfo


class GetConnectedPeersResponseFactory(factory.Factory):
class Meta:
model = GetConnectedPeersResponse

@classmethod
def from_sessions(cls,
sessions: Sequence[SessionAPI],
*args: Any,
**kwargs: Any) -> GetConnectedPeersResponse:
return GetConnectedPeersResponse(tuple(
PeerInfo(
session=session,
capabilities=(),
client_version_string='unknown',
inbound=False
) for session in sessions
))

0 comments on commit 90e6397

Please sign in to comment.