diff --git a/eth2/beacon/tools/builder/aggregator.py b/eth2/beacon/tools/builder/aggregator.py index fa51a682d6..5b317bc086 100644 --- a/eth2/beacon/tools/builder/aggregator.py +++ b/eth2/beacon/tools/builder/aggregator.py @@ -1,7 +1,6 @@ from typing import Sequence from eth_typing import BLSSignature -from eth_utils import ValidationError, encode_hex from ssz import get_hash_tree_root, uint64 from eth2._utils.bls import bls diff --git a/tests/components/eth2/beacon/test_validator.py b/tests/components/eth2/beacon/test_validator.py index c6a2f0dd4c..54f1d9557b 100644 --- a/tests/components/eth2/beacon/test_validator.py +++ b/tests/components/eth2/beacon/test_validator.py @@ -344,9 +344,9 @@ async def test_validator_get_committee_assigment(event_loop, event_bus): state = alice.chain.get_head_state() epoch = compute_epoch_at_slot(state.slot, state_machine.config.SLOTS_PER_EPOCH) - assert alice.this_epoch_assignment[alice_indices[0]][0] == -1 + assert alice.local_validator_epoch_assignment[alice_indices[0]][0] == -1 alice._get_local_current_epoch_assignment(alice_indices[0], epoch) - assert alice.this_epoch_assignment[alice_indices[0]][0] == epoch + assert alice.local_validator_epoch_assignment[alice_indices[0]][0] == epoch @pytest.mark.asyncio diff --git a/tests/libp2p/bcc/test_receive_server.py b/tests/libp2p/bcc/test_receive_server.py index 7076a4edd4..039f25dedf 100644 --- a/tests/libp2p/bcc/test_receive_server.py +++ b/tests/libp2p/bcc/test_receive_server.py @@ -21,6 +21,7 @@ from trinity.db.beacon.chain import AsyncBeaconChainDB from trinity.protocol.bcc_libp2p.configs import ( ATTESTATION_SUBNET_COUNT, + PUBSUB_TOPIC_BEACON_AGGREGATE_AND_PROOF, PUBSUB_TOPIC_BEACON_ATTESTATION, PUBSUB_TOPIC_BEACON_BLOCK, PUBSUB_TOPIC_COMMITTEE_BEACON_ATTESTATION, @@ -87,14 +88,18 @@ async def receive_server(): topic_msg_queues = { PUBSUB_TOPIC_BEACON_BLOCK: asyncio.Queue(), PUBSUB_TOPIC_BEACON_ATTESTATION: asyncio.Queue(), + PUBSUB_TOPIC_BEACON_AGGREGATE_AND_PROOF: asyncio.Queue(), } + subnets = set(subnet_id for subnet_id in range(ATTESTATION_SUBNET_COUNT)) for subnet_id in range(ATTESTATION_SUBNET_COUNT): topic = ( PUBSUB_TOPIC_COMMITTEE_BEACON_ATTESTATION.substitute(subnet_id=subnet_id), ) topic_msg_queues[topic] = asyncio.Queue() chain = await get_fake_chain() - server = ReceiveServerFactory(chain=chain, topic_msg_queues=topic_msg_queues) + server = ReceiveServerFactory( + chain=chain, topic_msg_queues=topic_msg_queues, subnets=subnets + ) asyncio.ensure_future(server.run()) await server.ready.wait() try: @@ -110,13 +115,8 @@ async def receive_server_with_mock_process_orphan_blocks_period( topic_msg_queues = { PUBSUB_TOPIC_BEACON_BLOCK: asyncio.Queue(), PUBSUB_TOPIC_BEACON_ATTESTATION: asyncio.Queue(), + PUBSUB_TOPIC_BEACON_AGGREGATE_AND_PROOF: asyncio.Queue(), } - for subnet_id in range(ATTESTATION_SUBNET_COUNT): - topic = ( - PUBSUB_TOPIC_COMMITTEE_BEACON_ATTESTATION.substitute(subnet_id=subnet_id), - ) - topic_msg_queues[topic] = asyncio.Queue() - chain = await get_fake_chain() server = ReceiveServerFactory(chain=chain, topic_msg_queues=topic_msg_queues) asyncio.ensure_future(server.run()) @@ -383,6 +383,11 @@ async def request_beacon_blocks_by_root(peer_id, block_roots): return requested_blocks with monkeypatch.context() as m: + for orphan_block in (blocks[4],) + fork_blocks: + receive_server.orphan_block_pool.add(orphan_block) + await wait_until_true( + lambda: len(receive_server.orphan_block_pool) != 0, timeout=4 + ) for peer in (peer1, peer2): receive_server.p2p_node.handshaked_peers.add(peer) m.setattr( @@ -390,12 +395,9 @@ async def request_beacon_blocks_by_root(peer_id, block_roots): "request_beacon_blocks_by_root", request_beacon_blocks_by_root, ) - - for orphan_block in (blocks[4],) + fork_blocks: - receive_server.orphan_block_pool.add(orphan_block) # Wait for receive server to process the orphan blocks await wait_until_true( - lambda: len(receive_server.orphan_block_pool) == 0, timeout=2 + lambda: len(receive_server.orphan_block_pool) == 0, timeout=4 ) # Check that both peers were requested for blocks assert peer_1_called_event.is_set() diff --git a/tests/libp2p/bcc/test_topic_validator.py b/tests/libp2p/bcc/test_topic_validator.py index d4e90bdbc6..d3d9dc8cfc 100644 --- a/tests/libp2p/bcc/test_topic_validator.py +++ b/tests/libp2p/bcc/test_topic_validator.py @@ -1,9 +1,9 @@ import pytest from trinity.protocol.bcc_libp2p.configs import ( + PUBSUB_TOPIC_BEACON_AGGREGATE_AND_PROOF, PUBSUB_TOPIC_BEACON_ATTESTATION, PUBSUB_TOPIC_BEACON_BLOCK, - PUBSUB_TOPIC_COMMITTEE_BEACON_ATTESTATION, ) @@ -14,7 +14,7 @@ async def test_setup_topic_validators(nodes): subnet_id = 0 topic_1 = PUBSUB_TOPIC_BEACON_BLOCK topic_2 = PUBSUB_TOPIC_BEACON_ATTESTATION - topic_3 = PUBSUB_TOPIC_COMMITTEE_BEACON_ATTESTATION.substitute(subnet_id=subnet_id) + topic_3 = PUBSUB_TOPIC_BEACON_AGGREGATE_AND_PROOF assert topic_1 in node.pubsub.topic_validators assert topic_2 in node.pubsub.topic_validators assert topic_3 in node.pubsub.topic_validators diff --git a/trinity/protocol/bcc_libp2p/servers.py b/trinity/protocol/bcc_libp2p/servers.py index 6c74015c1c..57f978511a 100644 --- a/trinity/protocol/bcc_libp2p/servers.py +++ b/trinity/protocol/bcc_libp2p/servers.py @@ -50,6 +50,7 @@ validate_attestation_slot, ) from eth2.beacon.typing import CommitteeIndex, Slot +from eth2.configs import Eth2Config from trinity.protocol.bcc_libp2p.node import Node @@ -63,6 +64,20 @@ PROCESS_ORPHAN_BLOCKS_PERIOD = 10.0 +def is_valid_slot(attestation: Attestation, current_slot: Slot, config: Eth2Config) -> bool: + try: + validate_attestation_slot( + attestation.data.slot, + current_slot, + config.SLOTS_PER_EPOCH, + config.MIN_ATTESTATION_INCLUSION_DELAY, + ) + except ValidationError: + return False + else: + return True + + class AttestationPool(OperationPool[Attestation]): """ Store the attestations not yet included on chain. @@ -104,6 +119,34 @@ def batch_remove(self, attestations: Iterable[Attestation]) -> None: for attestation in attestations: self.remove(attestation) + def get_valid_attestation_by_current_slot( + self, + slot: Slot, + config: Eth2Config + ) -> Tuple[Attestation, ...]: + return tuple( + filter( + lambda attestation: is_valid_slot(attestation, slot, config), + self._pool_storage.values() + ) + ) + + def get_acceptable_attestations( + self, + slot: Slot, + committee_index: CommitteeIndex, + beacon_block_root: SigningRoot + ) -> Tuple[Attestation, ...]: + return tuple( + filter( + lambda attestation: + beacon_block_root == attestation.data.beacon_block_root and + slot == attestation.data.slot and + committee_index == attestation.data.index, + self._pool_storage.values() + ) + ) + class OrphanBlockPool: """ @@ -190,8 +233,7 @@ async def _run(self) -> None: self.run_daemon_task(self._handle_beacon_attestation_loop()) self.run_daemon_task(self._handle_beacon_block_loop()) self.run_daemon_task(self._handle_aggregate_and_proof_loop()) - for subnet_id in self.subnets: - self.run_daemon_task(self._handle_committee_beacon_attestation_loop(subnet_id)) + self.run_daemon_task(self._handle_committee_beacon_attestation_loop()) self.run_daemon_task(self._process_orphan_blocks_loop()) self.ready.set() await self.cancellation() @@ -220,12 +262,17 @@ async def _handle_beacon_attestation_loop(self) -> None: self._handle_beacon_attestation ) - async def _handle_committee_beacon_attestation_loop(self, subnet_id: SubnetId) -> None: - topic = PUBSUB_TOPIC_COMMITTEE_BEACON_ATTESTATION.substitute(subnet_id=str(subnet_id)) - await self._handle_message( - topic, - self._handle_committee_beacon_attestation, - ) + async def _handle_committee_beacon_attestation_loop(self) -> None: + while True: + await asyncio.sleep(0.5) + for subnet_id in self.subnets: + topic = PUBSUB_TOPIC_COMMITTEE_BEACON_ATTESTATION.substitute( + subnet_id=str(subnet_id) + ) + await self._handle_message( + topic, + self._handle_committee_beacon_attestation, + ) async def _handle_aggregate_and_proof_loop(self) -> None: await self._handle_message( @@ -401,35 +448,29 @@ def _is_block_seen(self, block: BaseBeaconBlock) -> bool: # Exposed APIs for Validator # @to_tuple - def get_ready_attestations( - self, - current_slot: Slot - ) -> Iterable[Attestation]: + def get_ready_attestations(self, current_slot: Slot) -> Iterable[Attestation]: + """ + Get the attestations that are ready to be included in ``current_slot`` block. + """ config = self.chain.get_state_machine().config - for attestation in self.attestation_pool.get_all(): - try: - validate_attestation_slot( - attestation.data.slot, - current_slot, - config.SLOTS_PER_EPOCH, - config.MIN_ATTESTATION_INCLUSION_DELAY, - ) - except ValidationError: - continue - else: - yield attestation + return self.attestation_pool.get_valid_attestation_by_current_slot(current_slot, config) def get_aggregatable_attestations( self, slot: Slot, committee_index: CommitteeIndex ) -> Tuple[Attestation, ...]: - return tuple( - filter( - lambda attestation: - slot == attestation.data.slot and committee_index == attestation.data.index, - self.attestation_pool.get_all() - ) + """ + Get the attestations of ``slot`` and ``committee_index``. + """ + try: + block = self.chain.get_canonical_block_by_slot(slot) + except BlockNotFound: + return () + + beacon_block_root = block.signing_root + return self.attestation_pool.get_acceptable_attestations( + slot, committee_index, beacon_block_root ) def import_attestation(self, attestation: Attestation) -> None: diff --git a/trinity/tools/bcc_factories.py b/trinity/tools/bcc_factories.py index afa92dbac2..771b5c86b6 100644 --- a/trinity/tools/bcc_factories.py +++ b/trinity/tools/bcc_factories.py @@ -7,7 +7,6 @@ Tuple, Type, Sequence, - Set, ) from async_generator import asynccontextmanager @@ -41,7 +40,7 @@ BaseBeaconBlock, ) from eth2.beacon.state_machines.forks.serenity import SERENITY_CONFIG -from eth2.beacon.typing import Slot, SubnetId +from eth2.beacon.typing import Slot from eth2.configs import ( Eth2GenesisConfig, ) @@ -49,7 +48,6 @@ from trinity.db.beacon.chain import AsyncBeaconChainDB -from trinity.protocol.bcc_libp2p.configs import ATTESTATION_SUBNET_COUNT from trinity.protocol.bcc_libp2p.node import Node, PeerPool, Peer from trinity.protocol.bcc_libp2p.servers import BCCReceiveServer from trinity.sync.beacon.chain import BeaconChainSyncer @@ -86,9 +84,7 @@ class Meta: cancel_token = None bootstrap_nodes = None preferred_nodes: Tuple[Multiaddr, ...] = tuple() - subnets: Set[SubnetId] = set( - SubnetId(subnet_id) for subnet_id in range(ATTESTATION_SUBNET_COUNT) - ) + subnets: None chain = factory.SubFactory(BeaconChainFactory) @classmethod @@ -237,9 +233,7 @@ class Meta: chain = None p2p_node = factory.SubFactory(NodeFactory) topic_msg_queues = None - subnets: Set[SubnetId] = set( - SubnetId(subnet_id) for subnet_id in range(ATTESTATION_SUBNET_COUNT) - ) + subnets = None cancel_token = None @classmethod