Skip to content
This repository has been archived by the owner on Jul 1, 2021. It is now read-only.

Commit

Permalink
Merge pull request #2058 from njgheorghita/new-block-component
Browse files Browse the repository at this point in the history
Add new block component to propagate new block messages
  • Loading branch information
njgheorghita committed Oct 9, 2020
2 parents 4a7558f + 3e733e6 commit 32c8997
Show file tree
Hide file tree
Showing 7 changed files with 186 additions and 0 deletions.
2 changes: 2 additions & 0 deletions newsfragments/2058.feature.rst
@@ -0,0 +1,2 @@
Introduce NewBlockComponent to be a better p2p participant. By rebroadcasting
blocks as they are received from peers, and after they have been imported.
Empty file.
152 changes: 152 additions & 0 deletions trinity/components/builtin/new_block/component.py
@@ -0,0 +1,152 @@
import math
import random
from typing import (
Dict,
Iterable,
List,
Tuple,
)

from async_service import (
Service,
background_trio_service,
)
from eth.abc import BlockHeaderAPI, BlockAPI
from eth.consensus.pow import check_pow
from eth_utils import (
ValidationError,
to_tuple,
)
from lahja import EndpointAPI
import trio

from p2p.abc import SessionAPI
from trinity.constants import TO_NETWORKING_BROADCAST_CONFIG
from trinity.extensibility import TrioIsolatedComponent
from trinity.protocol.eth.events import NewBlockEvent
from trinity.protocol.eth.payloads import (
NewBlockHash,
NewBlockPayload,
)
from trinity.protocol.eth.peer import (
ETHProxyPeerPool,
ETHProxyPeer,
)
from trinity.sync.common.events import NewBlockImported
from trinity._utils.logging import get_logger


class NewBlockComponent(TrioIsolatedComponent):
"""
Propogate newly received and imported blocks to peers, according to devp2p rules.
https://github.com/ethereum/devp2p/blob/master/caps/eth.md#block-propagation
"""
name = "NewBlockComponent"

@property
def is_enabled(self) -> bool:
return True

async def do_run(self, event_bus: EndpointAPI) -> None:
proxy_peer_pool = ETHProxyPeerPool(event_bus, TO_NETWORKING_BROADCAST_CONFIG)
async with background_trio_service(proxy_peer_pool):
service = NewBlockService(event_bus, proxy_peer_pool)
async with background_trio_service(service) as manager:
await manager.wait_finished()


class NewBlockService(Service):

logger = get_logger('trinity.components.new_block.NewBlockService')

def __init__(self, event_bus: EndpointAPI, peer_pool: ETHProxyPeerPool) -> None:
self._event_bus = event_bus
self._peer_pool = peer_pool
# tracks which peers have seen a block
# todo: old blocks need to be pruned to avoid unbounded growth of tracker
self._peer_block_tracker: Dict[bytes, List[str]] = {}

async def run(self) -> None:
self.manager.run_daemon_task(self._handle_imported_blocks)

async for event in self._event_bus.stream(NewBlockEvent):
self.manager.run_task(self._handle_new_block, event.session, event.command.payload)

async def _handle_imported_blocks(self) -> None:
async for event in self._event_bus.stream(NewBlockImported):
await self._broadcast_imported_block(event.block)

async def _handle_new_block(self, sender: SessionAPI, block: NewBlockPayload) -> None:
header = block.block.header
sender_peer_str = str(ETHProxyPeer.from_session(
sender,
self._event_bus,
TO_NETWORKING_BROADCAST_CONFIG
))

# Add peer to tracker if we've seen this block before
if header.hash in self._peer_block_tracker:
if sender_peer_str not in self._peer_block_tracker[header.hash]:
self._peer_block_tracker[header.hash].append(sender_peer_str)
else:
# Verify the validity of block, add to tracker and broadcast to eligible peers
try:
check_pow(
header.block_number,
header.mining_hash,
header.mix_hash,
header.nonce,
header.difficulty
)
except ValidationError:
self.logger.info("Received invalid block from peer: %s", sender_peer_str)
else:
self._peer_block_tracker[header.hash] = [sender_peer_str]
await self._broadcast_newly_seen_block(header)

async def _broadcast_imported_block(self, block: BlockAPI) -> None:
"""
Broadcast `NewBlockHashes` for newly imported block to eligible peers,
aka those that haven't seen the block.
"""
all_peers = await self._peer_pool.get_peers()
if block.hash in self._peer_block_tracker:
eligible_peers = self._filter_eligible_peers(all_peers, block.hash)
else:
self._peer_block_tracker[block.hash] = []
eligible_peers = all_peers

new_block_hash = NewBlockHash(hash=block.hash, number=block.number)
for peer in eligible_peers:
target_peer = await self._peer_pool.ensure_proxy_peer(peer.session)
target_peer.eth_api.send_new_block_hashes((new_block_hash,))
self._peer_block_tracker[block.hash].append(str(target_peer))
# add checkpoint here to guarantee the event loop is released per iteration
await trio.sleep(0)

async def _broadcast_newly_seen_block(self, header: BlockHeaderAPI) -> None:
"""
Broadcast `NewBlock` for newly received block to square root of
total # of connected peers, aka those that haven't seen the block.
"""
all_peers = await self._peer_pool.get_peers()
eligible_peers = self._filter_eligible_peers(all_peers, header.hash)
number_of_broadcasts = int(math.sqrt(len(all_peers)))
sample_size = min(len(eligible_peers), number_of_broadcasts)
broadcast_peers = random.sample(eligible_peers, sample_size)

for peer in broadcast_peers:
target_peer = await self._peer_pool.ensure_proxy_peer(peer.session)
target_peer.eth_api.send_block_headers((header,))
self._peer_block_tracker[header.hash].append(str(target_peer))

@to_tuple
def _filter_eligible_peers(self,
all_peers: Tuple[ETHProxyPeer],
block_hash: bytes) -> Iterable[ETHProxyPeer]:
"""
Filter and return peers who have not seen the given block hash.
"""
for peer in all_peers:
if str(peer) not in self._peer_block_tracker[block_hash]:
yield peer
4 changes: 4 additions & 0 deletions trinity/components/registry.py
Expand Up @@ -37,6 +37,9 @@
from trinity.components.builtin.network_db.component import (
NetworkDBComponent,
)
from trinity.components.builtin.new_block.component import (
NewBlockComponent,
)
from trinity.components.builtin.peer_discovery.component import (
PeerDiscoveryComponent,
)
Expand Down Expand Up @@ -78,6 +81,7 @@
ExportBlockComponent,
ImportBlockComponent,
MetricsComponent,
NewBlockComponent,
RequestServerComponent,
SyncerComponent,
TxComponent,
Expand Down
10 changes: 10 additions & 0 deletions trinity/protocol/eth/proxy.py
Expand Up @@ -32,6 +32,7 @@
from .commands import (
BlockBodiesV65,
BlockHeadersV65,
NewBlockHashes,
NodeDataV65,
ReceiptsV65,
Transactions,
Expand All @@ -42,6 +43,7 @@
GetBlockHeadersRequest,
GetNodeDataRequest,
GetReceiptsRequest,
NewBlockHashesEvent,
SendBlockBodiesEvent,
SendBlockHeadersEvent,
SendNodeDataEvent,
Expand All @@ -50,6 +52,7 @@
GetPooledTransactionsRequest,
SendPooledTransactionsEvent,
)
from .payloads import NewBlockHash


class ProxyETHAPI:
Expand Down Expand Up @@ -218,6 +221,13 @@ def send_block_headers(self, headers: Sequence[BlockHeaderAPI]) -> None:
self._broadcast_config,
)

def send_new_block_hashes(self, new_block_hashes: Sequence[NewBlockHash]) -> None:
command = NewBlockHashes(tuple(new_block_hashes))
self._event_bus.broadcast_nowait(
NewBlockHashesEvent(self.session, command),
self._broadcast_config,
)

def send_block_bodies(self, blocks: Sequence[BlockAPI]) -> None:
block_bodies = tuple(
BlockBody(block.transactions, block.uncles)
Expand Down
9 changes: 9 additions & 0 deletions trinity/sync/beam/importer.py
Expand Up @@ -53,6 +53,7 @@

from trinity._utils.timer import Timer
from trinity.chains.full import FullChain
from trinity.constants import FIRE_AND_FORGET_BROADCASTING
from trinity.exceptions import StateUnretrievable
from trinity.sync.beam.constants import (
BLOCK_IMPORT_MISSING_STATE_TIMEOUT,
Expand All @@ -69,6 +70,7 @@
MissingAccountResult,
MissingBytecodeResult,
MissingStorageResult,
NewBlockImported,
StatelessBlockImportDone,
)
from trinity._utils.logging import get_logger
Expand Down Expand Up @@ -493,6 +495,13 @@ def _broadcast_import_complete(
),
broadcast_config,
)
if completed:
event_bus.broadcast_nowait(
NewBlockImported(
block,
),
FIRE_AND_FORGET_BROADCASTING,
)


def partial_import_block(beam_chain: BeamChain,
Expand Down
9 changes: 9 additions & 0 deletions trinity/sync/common/events.py
Expand Up @@ -140,6 +140,15 @@ class StatelessBlockImportDone(BaseEvent):
exception: BaseException


@dataclass
class NewBlockImported(BaseEvent):
"""
Event that is only emitted after a new block has been successfully imported.
"""

block: BlockAPI


@dataclass
class DoStatelessBlockImport(BaseRequestResponseEvent[StatelessBlockImportDone]):
"""
Expand Down

0 comments on commit 32c8997

Please sign in to comment.