Skip to content
This repository has been archived by the owner on Jul 1, 2021. It is now read-only.

Commit

Permalink
Refactor and fix
Browse files Browse the repository at this point in the history
Use one `run_daemon_task` for all subnets

Refactor attestation pool query

Fix orphan block pool tests
  • Loading branch information
hwwhww committed Nov 26, 2019
1 parent ad0efb2 commit a598c23
Show file tree
Hide file tree
Showing 6 changed files with 91 additions and 55 deletions.
1 change: 0 additions & 1 deletion eth2/beacon/tools/builder/aggregator.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
from typing import Sequence

from eth_typing import BLSSignature
from eth_utils import ValidationError, encode_hex
from ssz import get_hash_tree_root, uint64

from eth2._utils.bls import bls
Expand Down
4 changes: 2 additions & 2 deletions tests/components/eth2/beacon/test_validator.py
Original file line number Diff line number Diff line change
Expand Up @@ -344,9 +344,9 @@ async def test_validator_get_committee_assigment(event_loop, event_bus):
state = alice.chain.get_head_state()
epoch = compute_epoch_at_slot(state.slot, state_machine.config.SLOTS_PER_EPOCH)

assert alice.this_epoch_assignment[alice_indices[0]][0] == -1
assert alice.local_validator_epoch_assignment[alice_indices[0]][0] == -1
alice._get_local_current_epoch_assignment(alice_indices[0], epoch)
assert alice.this_epoch_assignment[alice_indices[0]][0] == epoch
assert alice.local_validator_epoch_assignment[alice_indices[0]][0] == epoch


@pytest.mark.asyncio
Expand Down
24 changes: 13 additions & 11 deletions tests/libp2p/bcc/test_receive_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
from trinity.db.beacon.chain import AsyncBeaconChainDB
from trinity.protocol.bcc_libp2p.configs import (
ATTESTATION_SUBNET_COUNT,
PUBSUB_TOPIC_BEACON_AGGREGATE_AND_PROOF,
PUBSUB_TOPIC_BEACON_ATTESTATION,
PUBSUB_TOPIC_BEACON_BLOCK,
PUBSUB_TOPIC_COMMITTEE_BEACON_ATTESTATION,
Expand Down Expand Up @@ -87,14 +88,18 @@ async def receive_server():
topic_msg_queues = {
PUBSUB_TOPIC_BEACON_BLOCK: asyncio.Queue(),
PUBSUB_TOPIC_BEACON_ATTESTATION: asyncio.Queue(),
PUBSUB_TOPIC_BEACON_AGGREGATE_AND_PROOF: asyncio.Queue(),
}
subnets = set(subnet_id for subnet_id in range(ATTESTATION_SUBNET_COUNT))
for subnet_id in range(ATTESTATION_SUBNET_COUNT):
topic = (
PUBSUB_TOPIC_COMMITTEE_BEACON_ATTESTATION.substitute(subnet_id=subnet_id),
)
topic_msg_queues[topic] = asyncio.Queue()
chain = await get_fake_chain()
server = ReceiveServerFactory(chain=chain, topic_msg_queues=topic_msg_queues)
server = ReceiveServerFactory(
chain=chain, topic_msg_queues=topic_msg_queues, subnets=subnets
)
asyncio.ensure_future(server.run())
await server.ready.wait()
try:
Expand All @@ -110,13 +115,8 @@ async def receive_server_with_mock_process_orphan_blocks_period(
topic_msg_queues = {
PUBSUB_TOPIC_BEACON_BLOCK: asyncio.Queue(),
PUBSUB_TOPIC_BEACON_ATTESTATION: asyncio.Queue(),
PUBSUB_TOPIC_BEACON_AGGREGATE_AND_PROOF: asyncio.Queue(),
}
for subnet_id in range(ATTESTATION_SUBNET_COUNT):
topic = (
PUBSUB_TOPIC_COMMITTEE_BEACON_ATTESTATION.substitute(subnet_id=subnet_id),
)
topic_msg_queues[topic] = asyncio.Queue()

chain = await get_fake_chain()
server = ReceiveServerFactory(chain=chain, topic_msg_queues=topic_msg_queues)
asyncio.ensure_future(server.run())
Expand Down Expand Up @@ -383,19 +383,21 @@ async def request_beacon_blocks_by_root(peer_id, block_roots):
return requested_blocks

with monkeypatch.context() as m:
for orphan_block in (blocks[4],) + fork_blocks:
receive_server.orphan_block_pool.add(orphan_block)
await wait_until_true(
lambda: len(receive_server.orphan_block_pool) != 0, timeout=4
)
for peer in (peer1, peer2):
receive_server.p2p_node.handshaked_peers.add(peer)
m.setattr(
receive_server.p2p_node,
"request_beacon_blocks_by_root",
request_beacon_blocks_by_root,
)

for orphan_block in (blocks[4],) + fork_blocks:
receive_server.orphan_block_pool.add(orphan_block)
# Wait for receive server to process the orphan blocks
await wait_until_true(
lambda: len(receive_server.orphan_block_pool) == 0, timeout=2
lambda: len(receive_server.orphan_block_pool) == 0, timeout=4
)
# Check that both peers were requested for blocks
assert peer_1_called_event.is_set()
Expand Down
4 changes: 2 additions & 2 deletions tests/libp2p/bcc/test_topic_validator.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
import pytest

from trinity.protocol.bcc_libp2p.configs import (
PUBSUB_TOPIC_BEACON_AGGREGATE_AND_PROOF,
PUBSUB_TOPIC_BEACON_ATTESTATION,
PUBSUB_TOPIC_BEACON_BLOCK,
PUBSUB_TOPIC_COMMITTEE_BEACON_ATTESTATION,
)


Expand All @@ -14,7 +14,7 @@ async def test_setup_topic_validators(nodes):
subnet_id = 0
topic_1 = PUBSUB_TOPIC_BEACON_BLOCK
topic_2 = PUBSUB_TOPIC_BEACON_ATTESTATION
topic_3 = PUBSUB_TOPIC_COMMITTEE_BEACON_ATTESTATION.substitute(subnet_id=subnet_id)
topic_3 = PUBSUB_TOPIC_BEACON_AGGREGATE_AND_PROOF
assert topic_1 in node.pubsub.topic_validators
assert topic_2 in node.pubsub.topic_validators
assert topic_3 in node.pubsub.topic_validators
101 changes: 71 additions & 30 deletions trinity/protocol/bcc_libp2p/servers.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@
validate_attestation_slot,
)
from eth2.beacon.typing import CommitteeIndex, Slot
from eth2.configs import Eth2Config

from trinity.protocol.bcc_libp2p.node import Node

Expand All @@ -63,6 +64,20 @@
PROCESS_ORPHAN_BLOCKS_PERIOD = 10.0


def is_valid_slot(attestation: Attestation, current_slot: Slot, config: Eth2Config) -> bool:
try:
validate_attestation_slot(
attestation.data.slot,
current_slot,
config.SLOTS_PER_EPOCH,
config.MIN_ATTESTATION_INCLUSION_DELAY,
)
except ValidationError:
return False
else:
return True


class AttestationPool(OperationPool[Attestation]):
"""
Store the attestations not yet included on chain.
Expand Down Expand Up @@ -104,6 +119,34 @@ def batch_remove(self, attestations: Iterable[Attestation]) -> None:
for attestation in attestations:
self.remove(attestation)

def get_valid_attestation_by_current_slot(
self,
slot: Slot,
config: Eth2Config
) -> Tuple[Attestation, ...]:
return tuple(
filter(
lambda attestation: is_valid_slot(attestation, slot, config),
self._pool_storage.values()
)
)

def get_acceptable_attestations(
self,
slot: Slot,
committee_index: CommitteeIndex,
beacon_block_root: SigningRoot
) -> Tuple[Attestation, ...]:
return tuple(
filter(
lambda attestation:
beacon_block_root == attestation.data.beacon_block_root and
slot == attestation.data.slot and
committee_index == attestation.data.index,
self._pool_storage.values()
)
)


class OrphanBlockPool:
"""
Expand Down Expand Up @@ -190,8 +233,7 @@ async def _run(self) -> None:
self.run_daemon_task(self._handle_beacon_attestation_loop())
self.run_daemon_task(self._handle_beacon_block_loop())
self.run_daemon_task(self._handle_aggregate_and_proof_loop())
for subnet_id in self.subnets:
self.run_daemon_task(self._handle_committee_beacon_attestation_loop(subnet_id))
self.run_daemon_task(self._handle_committee_beacon_attestation_loop())
self.run_daemon_task(self._process_orphan_blocks_loop())
self.ready.set()
await self.cancellation()
Expand Down Expand Up @@ -220,12 +262,17 @@ async def _handle_beacon_attestation_loop(self) -> None:
self._handle_beacon_attestation
)

async def _handle_committee_beacon_attestation_loop(self, subnet_id: SubnetId) -> None:
topic = PUBSUB_TOPIC_COMMITTEE_BEACON_ATTESTATION.substitute(subnet_id=str(subnet_id))
await self._handle_message(
topic,
self._handle_committee_beacon_attestation,
)
async def _handle_committee_beacon_attestation_loop(self) -> None:
while True:
await asyncio.sleep(0.5)
for subnet_id in self.subnets:
topic = PUBSUB_TOPIC_COMMITTEE_BEACON_ATTESTATION.substitute(
subnet_id=str(subnet_id)
)
await self._handle_message(
topic,
self._handle_committee_beacon_attestation,
)

async def _handle_aggregate_and_proof_loop(self) -> None:
await self._handle_message(
Expand Down Expand Up @@ -401,35 +448,29 @@ def _is_block_seen(self, block: BaseBeaconBlock) -> bool:
# Exposed APIs for Validator
#
@to_tuple
def get_ready_attestations(
self,
current_slot: Slot
) -> Iterable[Attestation]:
def get_ready_attestations(self, current_slot: Slot) -> Iterable[Attestation]:
"""
Get the attestations that are ready to be included in ``current_slot`` block.
"""
config = self.chain.get_state_machine().config
for attestation in self.attestation_pool.get_all():
try:
validate_attestation_slot(
attestation.data.slot,
current_slot,
config.SLOTS_PER_EPOCH,
config.MIN_ATTESTATION_INCLUSION_DELAY,
)
except ValidationError:
continue
else:
yield attestation
return self.attestation_pool.get_valid_attestation_by_current_slot(current_slot, config)

def get_aggregatable_attestations(
self,
slot: Slot,
committee_index: CommitteeIndex
) -> Tuple[Attestation, ...]:
return tuple(
filter(
lambda attestation:
slot == attestation.data.slot and committee_index == attestation.data.index,
self.attestation_pool.get_all()
)
"""
Get the attestations of ``slot`` and ``committee_index``.
"""
try:
block = self.chain.get_canonical_block_by_slot(slot)
except BlockNotFound:
return ()

beacon_block_root = block.signing_root
return self.attestation_pool.get_acceptable_attestations(
slot, committee_index, beacon_block_root
)

def import_attestation(self, attestation: Attestation) -> None:
Expand Down
12 changes: 3 additions & 9 deletions trinity/tools/bcc_factories.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@
Tuple,
Type,
Sequence,
Set,
)

from async_generator import asynccontextmanager
Expand Down Expand Up @@ -41,15 +40,14 @@
BaseBeaconBlock,
)
from eth2.beacon.state_machines.forks.serenity import SERENITY_CONFIG
from eth2.beacon.typing import Slot, SubnetId
from eth2.beacon.typing import Slot
from eth2.configs import (
Eth2GenesisConfig,
)
from multiaddr import Multiaddr

from trinity.db.beacon.chain import AsyncBeaconChainDB

from trinity.protocol.bcc_libp2p.configs import ATTESTATION_SUBNET_COUNT
from trinity.protocol.bcc_libp2p.node import Node, PeerPool, Peer
from trinity.protocol.bcc_libp2p.servers import BCCReceiveServer
from trinity.sync.beacon.chain import BeaconChainSyncer
Expand Down Expand Up @@ -86,9 +84,7 @@ class Meta:
cancel_token = None
bootstrap_nodes = None
preferred_nodes: Tuple[Multiaddr, ...] = tuple()
subnets: Set[SubnetId] = set(
SubnetId(subnet_id) for subnet_id in range(ATTESTATION_SUBNET_COUNT)
)
subnets: None
chain = factory.SubFactory(BeaconChainFactory)

@classmethod
Expand Down Expand Up @@ -237,9 +233,7 @@ class Meta:
chain = None
p2p_node = factory.SubFactory(NodeFactory)
topic_msg_queues = None
subnets: Set[SubnetId] = set(
SubnetId(subnet_id) for subnet_id in range(ATTESTATION_SUBNET_COUNT)
)
subnets = None
cancel_token = None

@classmethod
Expand Down

0 comments on commit a598c23

Please sign in to comment.