Skip to content
This repository has been archived by the owner on Jul 1, 2021. It is now read-only.

Commit

Permalink
Create BroadcastImportedBlock event.
Browse files Browse the repository at this point in the history
  • Loading branch information
njgheorghita committed Oct 7, 2020
1 parent 153d96f commit 08798ba
Show file tree
Hide file tree
Showing 3 changed files with 23 additions and 9 deletions.
10 changes: 2 additions & 8 deletions trinity/components/builtin/new_block/component.py
Expand Up @@ -31,7 +31,7 @@
ETHProxyPeerPool,
ETHProxyPeer,
)
from trinity.sync.common.events import StatelessBlockImportDone
from trinity.sync.common.events import BroadcastImportedBlock
from trinity._utils.logging import get_logger


Expand Down Expand Up @@ -71,13 +71,7 @@ async def run(self) -> None:
self.manager.run_task(self._handle_new_block, event.session, event.command.payload)

async def _handle_imported_blocks(self) -> None:
async for event in self._event_bus.stream(StatelessBlockImportDone):
if not event.completed:
self.logger.info(
"Stateless block import was not completed, skipping broadcast to peers."
)
return

async for event in self._event_bus.stream(BroadcastImportedBlock):
await self._broadcast_imported_block(event.block)

async def _handle_new_block(self, sender: SessionAPI, block: NewBlockPayload) -> None:
Expand Down
11 changes: 10 additions & 1 deletion trinity/sync/beam/importer.py
Expand Up @@ -53,6 +53,7 @@

from trinity._utils.timer import Timer
from trinity.chains.full import FullChain
from trinity.constants import FIRE_AND_FORGET_BROADCASTING
from trinity.exceptions import StateUnretrievable
from trinity.sync.beam.constants import (
BLOCK_IMPORT_MISSING_STATE_TIMEOUT,
Expand All @@ -61,6 +62,7 @@
NUM_PREVIEW_SHARDS,
)
from trinity.sync.common.events import (
BroadcastImportedBlock,
CollectMissingAccount,
CollectMissingBytecode,
CollectMissingStorage,
Expand Down Expand Up @@ -491,8 +493,15 @@ def _broadcast_import_complete(
future.result() if completed else None,
future.exception() if completed else None,
),
# broadcast_config,
broadcast_config,
)
if completed:
event_bus.broadcast_nowait(
BroadcastImportedBlock(
block,
),
FIRE_AND_FORGET_BROADCASTING,
)


def partial_import_block(beam_chain: BeamChain,
Expand Down
11 changes: 11 additions & 0 deletions trinity/sync/common/events.py
Expand Up @@ -140,6 +140,17 @@ class StatelessBlockImportDone(BaseEvent):
exception: BaseException


@dataclass
class BroadcastImportedBlock(BaseEvent):
"""
Event that is only emitted after a new block has been successfully imported.
This event communicates to the `NewBlockComponent` that a block is ready to
be broadcast to eligible peers.
"""

block: BlockAPI


@dataclass
class DoStatelessBlockImport(BaseRequestResponseEvent[StatelessBlockImportDone]):
"""
Expand Down

0 comments on commit 08798ba

Please sign in to comment.