From e787c73124b89b8eb18e1be8538835a845485256 Mon Sep 17 00:00:00 2001 From: NIC619 Date: Mon, 9 Dec 2019 18:40:23 +0800 Subject: [PATCH 01/18] Add eth1 monitor logger --- trinity/components/eth2/eth1_monitor/eth1_monitor.py | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/trinity/components/eth2/eth1_monitor/eth1_monitor.py b/trinity/components/eth2/eth1_monitor/eth1_monitor.py index ec9bf10981..499cb22cf8 100644 --- a/trinity/components/eth2/eth1_monitor/eth1_monitor.py +++ b/trinity/components/eth2/eth1_monitor/eth1_monitor.py @@ -1,5 +1,6 @@ import bisect from collections import OrderedDict +import logging from typing import ( Any, AsyncGenerator, @@ -88,6 +89,8 @@ def _w3_get_block(w3: Web3, *args: Any, **kwargs: Any) -> Eth1Block: class Eth1Monitor(Service): + logger = logging.getLogger('trinity.eth1_monitor') + _w3: Web3 _deposit_contract: "Web3.eth.contract" @@ -148,6 +151,7 @@ def highest_processed_block_number(self) -> BlockNumber: return self._db.highest_processed_block_number async def run(self) -> None: + self.logger.info("Eth1 Monitor up") self.manager.run_daemon_task(self._handle_new_logs) self.manager.run_daemon_task( self._run_handle_request, *(GetDepositRequest, self._handle_get_deposit) @@ -280,7 +284,7 @@ async def _new_blocks(self) -> AsyncGenerator[Eth1Block, None]: Keep polling latest blocks, and yield the blocks whose number is `latest_block.number - self._num_blocks_confirmed`. """ - while True: + while self.is_running: block = _w3_get_block(self._w3, "latest") target_block_number = BlockNumber(block.number - self._num_blocks_confirmed) from_block_number = self.highest_processed_block_number From 05a688cd428d0d6815f1fcb6737c632a3291da4c Mon Sep 17 00:00:00 2001 From: NIC619 Date: Mon, 9 Dec 2019 21:38:20 +0800 Subject: [PATCH 02/18] Add eth1 data provider abstraction --- .../eth2/eth1_monitor/eth1_data_provider.py | 94 +++++++++++++++++++ 1 file changed, 94 insertions(+) create mode 100644 trinity/components/eth2/eth1_monitor/eth1_data_provider.py diff --git a/trinity/components/eth2/eth1_monitor/eth1_data_provider.py b/trinity/components/eth2/eth1_monitor/eth1_data_provider.py new file mode 100644 index 0000000000..4e0ee49bbe --- /dev/null +++ b/trinity/components/eth2/eth1_monitor/eth1_data_provider.py @@ -0,0 +1,94 @@ +from abc import ABC, abstractmethod +from typing import Any, Dict, Optional, Tuple + +from eth_typing import Address, BlockNumber, Hash32 + +from eth_utils import encode_hex, event_abi_to_log_topic +from web3 import Web3 +from web3.utils.events import get_event_data + +from eth2.beacon.typing import Timestamp + +from .eth1_monitor import Eth1Block, DepositLog + + +class BaseEth1DataProvider(ABC): + + @abstractmethod + def get_block(self, arg: Optional[int, str]) -> Eth1Block: + ... + + @abstractmethod + def get_logs(self, block_number: BlockNumber) -> Tuple[DepositLog, ...]: + ... + + @abstractmethod + def get_deposit_count(self, block_number: BlockNumber) -> int: + ... + + @abstractmethod + def get_deposit_root(self, block_number: BlockNumber) -> Hash32: + ... + + +class Web3Eth1DataProvider(BaseEth1DataProvider): + + w3: Web3 + + _deposit_contract: "Web3.eth.contract" + _deposit_event_abi: Dict[str, Any] + _deposit_event_topic: str + + def __init__( + self, + w3: Web3, + deposit_contract_address: Address, + deposit_contract_abi: Dict[str, Any], + ) -> None: + self.w3 = w3 + self._deposit_contract = self._w3.eth.contract( + address=deposit_contract_address, abi=deposit_contract_abi + ) + self._deposit_event_abi = ( + self._deposit_contract.events.DepositEvent._get_event_abi() + ) + self._deposit_event_topic = encode_hex( + event_abi_to_log_topic(self._deposit_event_abi) + ) + + def get_block(self, arg: Optional[int, str]) -> Eth1Block: + block_dict = self._w3.eth.getBlock(arg) + return Eth1Block( + block_hash=Hash32(block_dict["hash"]), + number=BlockNumber(block_dict["number"]), + timestamp=Timestamp(block_dict["timestamp"]), + ) + + def get_logs(self, block_number: BlockNumber) -> Tuple[DepositLog, ...]: + # NOTE: web3 v4 does not support `contract.events.Event.getLogs`. + # After upgrading to v5, we can change to use the function. + logs = self._w3.eth.getLogs( + { + "fromBlock": block_number, + "toBlock": block_number, + "address": self._deposit_contract.address, + "topics": [self._deposit_event_topic], + } + ) + parsed_logs = tuple( + DepositLog.from_contract_log_dict( + get_event_data(self._deposit_event_abi, log) + ) + for log in logs + ) + return parsed_logs + + def get_deposit_count(self, block_number: BlockNumber) -> int: + return self._deposit_contract.functions.get_deposit_count().call( + block_identifier=block_number + ) + + def get_deposit_root(self, block_number: BlockNumber) -> Hash32: + return self._deposit_contract.functions.get_deposit_root().call( + block_identifier=block_number + ) From d5244d1b7ee7395f9446835f551755b8beec0ce7 Mon Sep 17 00:00:00 2001 From: NIC619 Date: Mon, 9 Dec 2019 21:43:10 +0800 Subject: [PATCH 03/18] Abstract eth1 data provider in Eth1Monitor --- tests-trio/eth1-monitor/conftest.py | 17 +++- tests-trio/eth1-monitor/test_eth1_monitor.py | 7 +- .../eth2/eth1_monitor/eth1_data_provider.py | 49 +++++++--- .../eth2/eth1_monitor/eth1_monitor.py | 91 +++---------------- 4 files changed, 66 insertions(+), 98 deletions(-) diff --git a/tests-trio/eth1-monitor/conftest.py b/tests-trio/eth1-monitor/conftest.py index 238a287b66..5ad5e59cd9 100644 --- a/tests-trio/eth1-monitor/conftest.py +++ b/tests-trio/eth1-monitor/conftest.py @@ -20,6 +20,7 @@ from trinity.components.eth2.eth1_monitor.configs import deposit_contract_json from trinity.components.eth2.eth1_monitor.eth1_monitor import Eth1Monitor +from trinity.components.eth2.eth1_monitor.eth1_data_provider import Web3Eth1DataProvider from trinity.components.eth2.eth1_monitor.factories import DepositDataFactory from trinity.tools.factories.db import AtomicDBFactory @@ -82,19 +83,25 @@ def func_do_deposit(w3, deposit_contract): return functools.partial(deposit, w3=w3, deposit_contract=deposit_contract) +@pytest.fixture +async def eth1_data_provider(w3, deposit_contract): + return Web3Eth1DataProvider( + w3=w3, + deposit_contract_address=deposit_contract.address, + deposit_contract_abi=deposit_contract.abi, + ) + + @pytest.fixture async def eth1_monitor( - w3, - deposit_contract, + eth1_data_provider, num_blocks_confirmed, polling_period, endpoint_server, start_block_number, ): m = Eth1Monitor( - w3=w3, - deposit_contract_address=deposit_contract.address, - deposit_contract_abi=deposit_contract.abi, + eth1_data_provider=eth1_data_provider, num_blocks_confirmed=num_blocks_confirmed, polling_period=polling_period, start_block_number=start_block_number, diff --git a/tests-trio/eth1-monitor/test_eth1_monitor.py b/tests-trio/eth1-monitor/test_eth1_monitor.py index a334e6a066..a71aad9e85 100644 --- a/tests-trio/eth1-monitor/test_eth1_monitor.py +++ b/tests-trio/eth1-monitor/test_eth1_monitor.py @@ -28,8 +28,7 @@ @pytest.mark.trio async def test_logs_handling( - w3, - deposit_contract, + eth1_data_provider, tester, num_blocks_confirmed, polling_period, @@ -40,9 +39,7 @@ async def test_logs_handling( amount_0 = func_do_deposit() amount_1 = func_do_deposit() m = Eth1Monitor( - w3=w3, - deposit_contract_address=deposit_contract.address, - deposit_contract_abi=deposit_contract.abi, + eth1_data_provider=eth1_data_provider, num_blocks_confirmed=num_blocks_confirmed, polling_period=polling_period, start_block_number=start_block_number, diff --git a/trinity/components/eth2/eth1_monitor/eth1_data_provider.py b/trinity/components/eth2/eth1_monitor/eth1_data_provider.py index 4e0ee49bbe..9a91e1a365 100644 --- a/trinity/components/eth2/eth1_monitor/eth1_data_provider.py +++ b/trinity/components/eth2/eth1_monitor/eth1_data_provider.py @@ -1,21 +1,46 @@ from abc import ABC, abstractmethod -from typing import Any, Dict, Optional, Tuple +from typing import Any, Dict, NamedTuple, Tuple, Union -from eth_typing import Address, BlockNumber, Hash32 +from eth_typing import Address, BLSPubkey, BLSSignature, BlockNumber, Hash32 from eth_utils import encode_hex, event_abi_to_log_topic from web3 import Web3 from web3.utils.events import get_event_data -from eth2.beacon.typing import Timestamp - -from .eth1_monitor import Eth1Block, DepositLog +from eth2.beacon.typing import Gwei, Timestamp + + +class Eth1Block(NamedTuple): + block_hash: Hash32 + number: BlockNumber + timestamp: Timestamp + + +class DepositLog(NamedTuple): + block_hash: Hash32 + pubkey: BLSPubkey + # NOTE: The following noqa is to avoid a bug in pycodestyle. We can remove it after upgrading + # `flake8`. Ref: https://github.com/PyCQA/pycodestyle/issues/635#issuecomment-411916058 + withdrawal_credentials: Hash32 # noqa: E701 + amount: Gwei + signature: BLSSignature + + @classmethod + def from_contract_log_dict(cls, log: Dict[Any, Any]) -> "DepositLog": + log_args = log["args"] + return cls( + block_hash=log["blockHash"], + pubkey=log_args["pubkey"], + withdrawal_credentials=log_args["withdrawal_credentials"], + amount=Gwei(int.from_bytes(log_args["amount"], "little")), + signature=log_args["signature"], + ) class BaseEth1DataProvider(ABC): @abstractmethod - def get_block(self, arg: Optional[int, str]) -> Eth1Block: + def get_block(self, arg: Union[BlockNumber, str]) -> Eth1Block: ... @abstractmethod @@ -23,7 +48,7 @@ def get_logs(self, block_number: BlockNumber) -> Tuple[DepositLog, ...]: ... @abstractmethod - def get_deposit_count(self, block_number: BlockNumber) -> int: + def get_deposit_count(self, block_number: BlockNumber) -> bytes: ... @abstractmethod @@ -46,7 +71,7 @@ def __init__( deposit_contract_abi: Dict[str, Any], ) -> None: self.w3 = w3 - self._deposit_contract = self._w3.eth.contract( + self._deposit_contract = self.w3.eth.contract( address=deposit_contract_address, abi=deposit_contract_abi ) self._deposit_event_abi = ( @@ -56,8 +81,8 @@ def __init__( event_abi_to_log_topic(self._deposit_event_abi) ) - def get_block(self, arg: Optional[int, str]) -> Eth1Block: - block_dict = self._w3.eth.getBlock(arg) + def get_block(self, arg: Union[int, str]) -> Eth1Block: + block_dict = self.w3.eth.getBlock(arg) return Eth1Block( block_hash=Hash32(block_dict["hash"]), number=BlockNumber(block_dict["number"]), @@ -67,7 +92,7 @@ def get_block(self, arg: Optional[int, str]) -> Eth1Block: def get_logs(self, block_number: BlockNumber) -> Tuple[DepositLog, ...]: # NOTE: web3 v4 does not support `contract.events.Event.getLogs`. # After upgrading to v5, we can change to use the function. - logs = self._w3.eth.getLogs( + logs = self.w3.eth.getLogs( { "fromBlock": block_number, "toBlock": block_number, @@ -83,7 +108,7 @@ def get_logs(self, block_number: BlockNumber) -> Tuple[DepositLog, ...]: ) return parsed_logs - def get_deposit_count(self, block_number: BlockNumber) -> int: + def get_deposit_count(self, block_number: BlockNumber) -> bytes: return self._deposit_contract.functions.get_deposit_count().call( block_identifier=block_number ) diff --git a/trinity/components/eth2/eth1_monitor/eth1_monitor.py b/trinity/components/eth2/eth1_monitor/eth1_monitor.py index 499cb22cf8..0e52c02278 100644 --- a/trinity/components/eth2/eth1_monitor/eth1_monitor.py +++ b/trinity/components/eth2/eth1_monitor/eth1_monitor.py @@ -5,8 +5,6 @@ Any, AsyncGenerator, Callable, - Dict, - NamedTuple, Sequence, Tuple, Type, @@ -15,18 +13,15 @@ ) from async_service import Service -from eth_typing import Address, BLSPubkey, BLSSignature, BlockNumber, Hash32 +from eth_typing import BlockNumber, Hash32 -from eth_utils import encode_hex, event_abi_to_log_topic from lahja import EndpointAPI import trio from web3 import Web3 -from web3.utils.events import get_event_data from eth.abc import AtomicDatabaseAPI from eth2.beacon.typing import Timestamp -from eth2.beacon.typing import Gwei from eth2.beacon.types.deposits import Deposit from eth2.beacon.types.deposit_data import DepositData from eth2.beacon.types.eth1_data import Eth1Data @@ -36,6 +31,7 @@ ) from .db import BaseDepositDataDB, ListCachedDepositDataDB +from .eth1_data_provider import BaseEth1DataProvider, DepositLog, Eth1Block from .events import ( GetDepositResponse, GetDepositRequest, @@ -52,33 +48,6 @@ TRequest = TypeVar("TRequest", bound=Union[GetDepositRequest, GetEth1DataRequest]) -class Eth1Block(NamedTuple): - block_hash: Hash32 - number: BlockNumber - timestamp: Timestamp - - -class DepositLog(NamedTuple): - block_hash: Hash32 - pubkey: BLSPubkey - # NOTE: The following noqa is to avoid a bug in pycodestyle. We can remove it after upgrading - # `flake8`. Ref: https://github.com/PyCQA/pycodestyle/issues/635#issuecomment-411916058 - withdrawal_credentials: Hash32 # noqa: E701 - amount: Gwei - signature: BLSSignature - - @classmethod - def from_contract_log_dict(cls, log: Dict[Any, Any]) -> "DepositLog": - log_args = log["args"] - return cls( - block_hash=log["blockHash"], - pubkey=log_args["pubkey"], - withdrawal_credentials=log_args["withdrawal_credentials"], - amount=Gwei(int.from_bytes(log_args["amount"], "little")), - signature=log_args["signature"], - ) - - def _w3_get_block(w3: Web3, *args: Any, **kwargs: Any) -> Eth1Block: block_dict = w3.eth.getBlock(*args, **kwargs) return Eth1Block( @@ -91,11 +60,8 @@ def _w3_get_block(w3: Web3, *args: Any, **kwargs: Any) -> Eth1Block: class Eth1Monitor(Service): logger = logging.getLogger('trinity.eth1_monitor') - _w3: Web3 + _eth1_data_provider: BaseEth1DataProvider - _deposit_contract: "Web3.eth.contract" - _deposit_event_abi: Dict[str, Any] - _deposit_event_topic: str # Number of blocks we wait to consider a block is "confirmed". This is used to avoid # mainchain forks. # We always get a `block` and parse the logs from it, where @@ -114,25 +80,14 @@ class Eth1Monitor(Service): def __init__( self, *, - w3: Web3, - deposit_contract_address: Address, - deposit_contract_abi: Dict[str, Any], + eth1_data_provider: BaseEth1DataProvider, num_blocks_confirmed: int, polling_period: float, start_block_number: BlockNumber, event_bus: EndpointAPI, base_db: AtomicDatabaseAPI, ) -> None: - self._w3 = w3 - self._deposit_contract = self._w3.eth.contract( - address=deposit_contract_address, abi=deposit_contract_abi - ) - self._deposit_event_abi = ( - self._deposit_contract.events.DepositEvent._get_event_abi() - ) - self._deposit_event_topic = encode_hex( - event_abi_to_log_topic(self._deposit_event_abi) - ) + self._eth1_data_provider = eth1_data_provider self._num_blocks_confirmed = num_blocks_confirmed self._polling_period = polling_period self._event_bus = event_bus @@ -211,7 +166,7 @@ def _get_eth1_data( f"`distance`={distance}, ", f"eth1_voting_period_start_block_number={eth1_voting_period_start_block_number}", ) - block_hash = _w3_get_block(self._w3, target_block_number).block_hash + block_hash = self._eth1_data_provider.get_block(target_block_number).block_hash # `Eth1Data.deposit_count`: get the `deposit_count` corresponding to the block. accumulated_deposit_count = self._get_accumulated_deposit_count( target_block_number @@ -284,8 +239,8 @@ async def _new_blocks(self) -> AsyncGenerator[Eth1Block, None]: Keep polling latest blocks, and yield the blocks whose number is `latest_block.number - self._num_blocks_confirmed`. """ - while self.is_running: - block = _w3_get_block(self._w3, "latest") + while True: + block = self._eth1_data_provider.get_block("latest") target_block_number = BlockNumber(block.number - self._num_blocks_confirmed) from_block_number = self.highest_processed_block_number if target_block_number > from_block_number: @@ -293,7 +248,7 @@ async def _new_blocks(self) -> AsyncGenerator[Eth1Block, None]: for block_number in range( from_block_number + 1, target_block_number + 1 ): - yield _w3_get_block(self._w3, block_number) + yield self._eth1_data_provider.get_block(BlockNumber(block_number)) await trio.sleep(self._polling_period) def _handle_block_data(self, block: Eth1Block) -> None: @@ -314,25 +269,9 @@ def _handle_block_data(self, block: Eth1Block) -> None: def _get_logs_from_block(self, block_number: BlockNumber) -> Tuple[DepositLog, ...]: """ - Get the logs inside the block with number `block_number`. + Get the parsed logs inside the block with number `block_number`. """ - # NOTE: web3 v4 does not support `contract.events.Event.getLogs`. - # After upgrading to v5, we can change to use the function. - logs = self._w3.eth.getLogs( - { - "fromBlock": block_number, - "toBlock": block_number, - "address": self._deposit_contract.address, - "topics": [self._deposit_event_topic], - } - ) - parsed_logs = tuple( - DepositLog.from_contract_log_dict( - get_event_data(self._deposit_event_abi, log) - ) - for log in logs - ) - return parsed_logs + return self._eth1_data_provider.get_logs(block_number) def _process_logs( self, logs: Sequence[DepositLog], block_number: BlockNumber @@ -388,8 +327,8 @@ def _get_accumulated_deposit_count(self, block_number: BlockNumber) -> int: Get the accumulated deposit count from deposit contract with `get_deposit_count` at block `block_number`. """ - deposit_count_bytes = self._deposit_contract.functions.get_deposit_count().call( - block_identifier=block_number + deposit_count_bytes = self._eth1_data_provider.get_deposit_count( + block_number=block_number, ) return int.from_bytes(deposit_count_bytes, "little") @@ -398,6 +337,6 @@ def _get_deposit_root_from_contract(self, block_number: BlockNumber) -> Hash32: Get the deposit root from deposit contract with `get_deposit_root` at block `block_number`. """ - return self._deposit_contract.functions.get_deposit_root().call( - block_identifier=block_number + return self._eth1_data_provider.get_deposit_root( + block_number=block_number, ) From 977c565f7a9c2afbf6e67f66fedc53bbfb51ec00 Mon Sep 17 00:00:00 2001 From: NIC619 Date: Tue, 10 Dec 2019 15:51:37 +0800 Subject: [PATCH 04/18] Fix eth1 monitor db initialization --- trinity/components/eth2/eth1_monitor/db.py | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/trinity/components/eth2/eth1_monitor/db.py b/trinity/components/eth2/eth1_monitor/db.py index 836de7180a..88d0374f54 100644 --- a/trinity/components/eth2/eth1_monitor/db.py +++ b/trinity/components/eth2/eth1_monitor/db.py @@ -114,16 +114,17 @@ def __init__( ) -> None: self.db = db self._deposit_count = self._get_deposit_count() - # If the parameter `highest_processed_block_number` is given, set it in the database. + latest_processed_block_number = self._get_highest_processed_block_number() if highest_processed_block_number is not None: if highest_processed_block_number < 0: raise DepositDataDBValidationError( "`highest_processed_block_number` should be non-negative: " f"highest_processed_block_number={highest_processed_block_number}" ) - self._set_highest_processed_block_number( - self.db, highest_processed_block_number - ) + elif highest_processed_block_number > latest_processed_block_number: + self._set_highest_processed_block_number( + self.db, highest_processed_block_number + ) self._highest_processed_block_number = ( self._get_highest_processed_block_number() ) From 16dbf12161c06e03711fcdc3be765bbc077fa65a Mon Sep 17 00:00:00 2001 From: NIC619 Date: Tue, 10 Dec 2019 20:55:35 +0800 Subject: [PATCH 05/18] Add `FakeEth1DataProvider` --- .../eth2/eth1_monitor/eth1_data_provider.py | 68 +++++++++++++++++++ 1 file changed, 68 insertions(+) diff --git a/trinity/components/eth2/eth1_monitor/eth1_data_provider.py b/trinity/components/eth2/eth1_monitor/eth1_data_provider.py index 9a91e1a365..a2155bdabe 100644 --- a/trinity/components/eth2/eth1_monitor/eth1_data_provider.py +++ b/trinity/components/eth2/eth1_monitor/eth1_data_provider.py @@ -1,4 +1,5 @@ from abc import ABC, abstractmethod +import time from typing import Any, Dict, NamedTuple, Tuple, Union from eth_typing import Address, BLSPubkey, BLSSignature, BlockNumber, Hash32 @@ -7,6 +8,8 @@ from web3 import Web3 from web3.utils.events import get_event_data +from eth2._utils.hash import hash_eth2 +from eth2.beacon.constants import GWEI_PER_ETH from eth2.beacon.typing import Gwei, Timestamp @@ -56,6 +59,71 @@ def get_deposit_root(self, block_number: BlockNumber) -> Hash32: ... +AVERAGE_BLOCK_TIME = 20 + + +class FakeEth1DataProvider(BaseEth1DataProvider): + + start_block_number: BlockNumber + start_block_timestamp: Timestamp + + num_deposits_per_block: int + + def __init__( + self, + start_block_number: BlockNumber, + start_block_timestamp: Timestamp, + num_deposits_per_block: int, + ) -> None: + self.start_block_number = start_block_number + self.start_block_timestamp = start_block_timestamp + self.num_deposits_per_block = num_deposits_per_block + + def get_block(self, arg: Union[BlockNumber, str]) -> Eth1Block: + if isinstance(arg, int): + block_time = ( + self.start_block_timestamp + (arg - self.start_block_number) * AVERAGE_BLOCK_TIME + ) + return Eth1Block( + block_hash=hash_eth2(int(arg).to_bytes(4, byteorder='big')), + number=arg, + timestamp=Timestamp(block_time), + ) + else: + # Assume `arg` == 'latest' + current_time = int(time.time()) + num_blocks_inbetween = (current_time - self.start_block_timestamp) // AVERAGE_BLOCK_TIME + block_number = self.start_block_number + num_blocks_inbetween + return Eth1Block( + block_hash=hash_eth2(block_number.to_bytes(4, byteorder='big')), + number=BlockNumber(block_number), + timestamp=Timestamp(current_time - (current_time % AVERAGE_BLOCK_TIME)), + ) + + def get_logs(self, block_number: BlockNumber) -> Tuple[DepositLog, ...]: + logs = [] + for _ in range(self.num_deposits_per_block): + log = DepositLog( + block_hash=hash_eth2(block_number.to_bytes(4, byteorder='big')), + pubkey=BLSPubkey(b'\x12' * 48), + withdrawal_credentials=Hash32(b'\x23' * 32), + signature=BLSSignature(b'\x34' * 96), + amount=32 * GWEI_PER_ETH, + ) + logs.append(log) + return tuple(logs) + + def get_deposit_count(self, block_number: BlockNumber) -> bytes: + deposit_count = self.num_deposits_per_block * (block_number - self.start_block_number) + return deposit_count.to_bytes(4, byteorder='big') + + def get_deposit_root(self, block_number: BlockNumber) -> Hash32: + block_root = hash_eth2(block_number.to_bytes(4, byteorder='big')) + return hash_eth2( + block_root + self.get_deposit_count(block_number) + ) + + class Web3Eth1DataProvider(BaseEth1DataProvider): w3: Web3 From 99fd3d2f2195e277b7c3e159b2dd47c7d65591e4 Mon Sep 17 00:00:00 2001 From: NIC619 Date: Tue, 10 Dec 2019 20:57:50 +0800 Subject: [PATCH 06/18] Add `Eth1MonitorComponent` --- .../components/eth2/eth1_monitor/component.py | 90 +++++++++++++++++++ trinity/components/registry.py | 2 + 2 files changed, 92 insertions(+) create mode 100644 trinity/components/eth2/eth1_monitor/component.py diff --git a/trinity/components/eth2/eth1_monitor/component.py b/trinity/components/eth2/eth1_monitor/component.py new file mode 100644 index 0000000000..5e3bd5cad8 --- /dev/null +++ b/trinity/components/eth2/eth1_monitor/component.py @@ -0,0 +1,90 @@ +from argparse import ( + ArgumentParser, + _SubParsersAction, +) +import json +import time + +from async_service import Service, TrioManager + +from lahja import EndpointAPI + +# from web3 import Web3 + +from trinity.components.eth2.eth1_monitor.configs import deposit_contract_json +from trinity.components.eth2.eth1_monitor.eth1_data_provider import FakeEth1DataProvider +from trinity.config import BeaconAppConfig +from trinity.db.manager import DBClient +from trinity.events import ShutdownRequest +from trinity.extensibility import ( + TrioIsolatedComponent, +) + + +from .eth1_monitor import Eth1Monitor + + +# Fake eth1 monitor config +# TODO: These configs should be read from a config file, e.g., `eth1_monitor_config.yaml`. +DEPOSIT_CONTRACT_ABI = json.loads(deposit_contract_json)["abi"] +DEPOSIT_CONTRACT_ADDRESS = b"\x12" * 20 +NUM_BLOCKS_CONFIRMED = 100 +POLLING_PERIOD = 10 +START_BLOCK_NUMBER = 1 + +# Configs for fake Eth1DataProvider +NUM_DEPOSITS_PER_BLOCK = 5 +START_BLOCK_TIMESTAMP = int(time.time()) - 2100 # Around half an hour ago + + +class Eth1MonitorComponent(TrioIsolatedComponent): + + @property + def name(self) -> str: + return "Eth1 Monitor" + + def on_ready(self, manager_eventbus: EndpointAPI) -> None: + if self.boot_info.trinity_config.has_app_config(BeaconAppConfig): + self.start() + + @classmethod + def configure_parser(cls, + arg_parser: ArgumentParser, + subparser: _SubParsersAction) -> None: + # TODO: For now we use fake eth1 monitor. + pass + # arg_parser.add_argument( + # "--eth1client-rpc", + # help="RPC HTTP endpoint of Eth1 client ", + # ) + + async def run(self) -> None: + trinity_config = self.boot_info.trinity_config + + # TODO: For now we use fake eth1 monitor. + # if self.boot_info.args.eth1client_rpc: + # w3: Web3 = Web3.HTTPProvider(self.boot_info.args.eth1client_rpc) + # else: + # w3: Web3 = None + fake_eth1_data_provider = FakeEth1DataProvider( + start_block_number=START_BLOCK_NUMBER, + start_block_timestamp=START_BLOCK_TIMESTAMP, + num_deposits_per_block=NUM_DEPOSITS_PER_BLOCK, + ) + + base_db = DBClient.connect(trinity_config.database_ipc_path) + + eth1_monitor_service: Service = Eth1Monitor( + eth1_data_provider=fake_eth1_data_provider, + num_blocks_confirmed=NUM_BLOCKS_CONFIRMED, + polling_period=POLLING_PERIOD, + start_block_number=START_BLOCK_NUMBER, + event_bus=self.event_bus, + base_db=base_db, + ) + + try: + await TrioManager.run_service(eth1_monitor_service) + except Exception: + await self.event_bus.broadcast(ShutdownRequest("Eth1 Monitor ended unexpectedly")) + raise diff --git a/trinity/components/registry.py b/trinity/components/registry.py index cf179336e5..79e3ef89ae 100644 --- a/trinity/components/registry.py +++ b/trinity/components/registry.py @@ -49,6 +49,7 @@ UpnpComponent, ) from trinity.components.eth2.beacon.component import BeaconNodeComponent +from trinity.components.eth2.eth1_monitor.component import Eth1MonitorComponent from trinity.components.eth2.interop.component import InteropComponent from trinity.components.builtin.tx_pool.component import ( TxComponent, @@ -68,6 +69,7 @@ BEACON_NODE_COMPONENTS: Tuple[Type[BaseComponentAPI], ...] = ( BeaconNodeComponent, InteropComponent, + Eth1MonitorComponent, ) From 7aca352a7037961692a7f2337d53832480f63ba8 Mon Sep 17 00:00:00 2001 From: NIC619 Date: Tue, 10 Dec 2019 21:10:29 +0800 Subject: [PATCH 07/18] Add some logging to eth1 monitor --- trinity/components/eth2/eth1_monitor/eth1_monitor.py | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/trinity/components/eth2/eth1_monitor/eth1_monitor.py b/trinity/components/eth2/eth1_monitor/eth1_monitor.py index 0e52c02278..de161ef509 100644 --- a/trinity/components/eth2/eth1_monitor/eth1_monitor.py +++ b/trinity/components/eth2/eth1_monitor/eth1_monitor.py @@ -20,6 +20,7 @@ from web3 import Web3 from eth.abc import AtomicDatabaseAPI +from eth_utils import encode_hex from eth2.beacon.typing import Timestamp from eth2.beacon.types.deposits import Deposit @@ -58,7 +59,7 @@ def _w3_get_block(w3: Web3, *args: Any, **kwargs: Any) -> Eth1Block: class Eth1Monitor(Service): - logger = logging.getLogger('trinity.eth1_monitor') + logger = logging.getLogger('trinity.Eth1Monitor') _eth1_data_provider: BaseEth1DataProvider @@ -123,6 +124,11 @@ async def _handle_new_logs(self) -> None: async for block in self._new_blocks(): self._handle_block_data(block) logs = self._get_logs_from_block(block.number) + self.logger.info( + "Eth1 Monitor got new eth1 block: %s, number of logs contained in the block: %s", + block, + len(logs), + ) self._process_logs(logs, block.number) def _handle_get_deposit(self, req: GetDepositRequest) -> GetDepositResponse: @@ -225,6 +231,7 @@ async def _run_handle_request( self, event_type: Type[TRequest], event_handler: Callable[[TRequest], Any] ) -> None: async for req in self._event_bus.stream(event_type): + self.logger.info("Monitor receive deposit data request: %s", req) try: resp = event_handler(req) except Exception as e: From b4ada6683bd42f5aa51a6e6c0cfa4ffa862a3871 Mon Sep 17 00:00:00 2001 From: NIC619 Date: Thu, 12 Dec 2019 18:23:20 +0800 Subject: [PATCH 08/18] Add GetDistanceRequest/Response --- trinity/components/eth2/eth1_monitor/events.py | 18 +++++++++++++++++- 1 file changed, 17 insertions(+), 1 deletion(-) diff --git a/trinity/components/eth2/eth1_monitor/events.py b/trinity/components/eth2/eth1_monitor/events.py index cfe565af27..d9a75547da 100644 --- a/trinity/components/eth2/eth1_monitor/events.py +++ b/trinity/components/eth2/eth1_monitor/events.py @@ -4,7 +4,7 @@ from lahja import BaseEvent, BaseRequestResponseEvent -from eth_typing import BlockNumber +from eth_typing import BlockNumber, Hash32 from eth2.beacon.typing import Timestamp from eth2.beacon.types.deposits import Deposit @@ -58,3 +58,19 @@ class GetEth1DataRequest(BaseRequestResponseEvent[GetEth1DataResponse]): @staticmethod def expected_response_type() -> Type[GetEth1DataResponse]: return GetEth1DataResponse + + +@dataclass +class GetDistanceResponse(BaseEvent): + distance: int + error: Exception = None + + +@dataclass +class GetDistanceRequest(BaseRequestResponseEvent[GetDistanceResponse]): + block_hash: Hash32 + eth1_voting_period_start_timestamp: Timestamp + + @staticmethod + def expected_response_type() -> Type[GetDistanceResponse]: + return GetDistanceResponse From ca4f9d7d18b9c11698e1bfd27a56773446391c5f Mon Sep 17 00:00:00 2001 From: NIC619 Date: Thu, 12 Dec 2019 18:27:19 +0800 Subject: [PATCH 09/18] Add `_handle_get_distance` in eth1 monitor --- .../eth2/eth1_monitor/eth1_monitor.py | 47 +++++++++++++++++-- 1 file changed, 42 insertions(+), 5 deletions(-) diff --git a/trinity/components/eth2/eth1_monitor/eth1_monitor.py b/trinity/components/eth2/eth1_monitor/eth1_monitor.py index de161ef509..d4369061c3 100644 --- a/trinity/components/eth2/eth1_monitor/eth1_monitor.py +++ b/trinity/components/eth2/eth1_monitor/eth1_monitor.py @@ -34,6 +34,8 @@ from .db import BaseDepositDataDB, ListCachedDepositDataDB from .eth1_data_provider import BaseEth1DataProvider, DepositLog, Eth1Block from .events import ( + GetDistanceRequest, + GetDistanceResponse, GetDepositResponse, GetDepositRequest, GetEth1DataRequest, @@ -109,6 +111,9 @@ def highest_processed_block_number(self) -> BlockNumber: async def run(self) -> None: self.logger.info("Eth1 Monitor up") self.manager.run_daemon_task(self._handle_new_logs) + self.manager.run_daemon_task( + self._run_handle_request, *(GetDistanceRequest, self._handle_get_distance) + ) self.manager.run_daemon_task( self._run_handle_request, *(GetDepositRequest, self._handle_get_deposit) ) @@ -117,6 +122,20 @@ async def run(self) -> None: ) await self.manager.wait_finished() + def _handle_get_distance(self, req: GetDistanceRequest) -> GetDistanceResponse: + """ + Handle requests for `get_distance` from the event bus. + """ + block = self._eth1_data_provider.get_block(req.block_hash) + if block is None: + raise Eth1MonitorValidationError( + f"Block does not exist for block_hash={req.block_hash}" + ) + eth1_voting_period_start_block_number = self._get_closest_eth1_voting_period_start_block( + req.eth1_voting_period_start_timestamp, + ) + return GetDistanceResponse(distance=(eth1_voting_period_start_block_number - block.number)) + async def _handle_new_logs(self) -> None: """ Handle new blocks and the logs of them. @@ -172,7 +191,12 @@ def _get_eth1_data( f"`distance`={distance}, ", f"eth1_voting_period_start_block_number={eth1_voting_period_start_block_number}", ) - block_hash = self._eth1_data_provider.get_block(target_block_number).block_hash + block = self._eth1_data_provider.get_block(target_block_number) + if block is None: + raise Eth1MonitorValidationError( + f"Block does not exist for block number={target_block_number}" + ) + block_hash = block.block_hash # `Eth1Data.deposit_count`: get the `deposit_count` corresponding to the block. accumulated_deposit_count = self._get_accumulated_deposit_count( target_block_number @@ -235,9 +259,14 @@ async def _run_handle_request( try: resp = event_handler(req) except Exception as e: - await self._event_bus.broadcast( - req.expected_response_type()(None, None, e), req.broadcast_config() - ) + if event_type is GetDistanceRequest: + await self._event_bus.broadcast( + req.expected_response_type()(None, e), req.broadcast_config() + ) + else: + await self._event_bus.broadcast( + req.expected_response_type()(None, None, e), req.broadcast_config() + ) else: await self._event_bus.broadcast(resp, req.broadcast_config()) @@ -248,14 +277,22 @@ async def _new_blocks(self) -> AsyncGenerator[Eth1Block, None]: """ while True: block = self._eth1_data_provider.get_block("latest") + if block is None: + raise Eth1MonitorValidationError(f"Fail to get latest block") target_block_number = BlockNumber(block.number - self._num_blocks_confirmed) from_block_number = self.highest_processed_block_number + self.logger.info("Eth1 Data Provider latest block: %s, %s, %s", block.number, target_block_number, from_block_number) if target_block_number > from_block_number: # From `highest_processed_block_number` to `target_block_number` for block_number in range( from_block_number + 1, target_block_number + 1 ): - yield self._eth1_data_provider.get_block(BlockNumber(block_number)) + block = self._eth1_data_provider.get_block(BlockNumber(block_number)) + if block is None: + raise Eth1MonitorValidationError( + f"Block does not exist for block number={block_number}" + ) + yield block await trio.sleep(self._polling_period) def _handle_block_data(self, block: Eth1Block) -> None: From c3812bf150b940b934a5d3da23cbef044a8b743c Mon Sep 17 00:00:00 2001 From: NIC619 Date: Thu, 12 Dec 2019 18:28:03 +0800 Subject: [PATCH 10/18] Add `GetDistanceRequest` test --- tests-trio/eth1-monitor/test_eth1_monitor.py | 27 ++++++++++++++++++++ 1 file changed, 27 insertions(+) diff --git a/tests-trio/eth1-monitor/test_eth1_monitor.py b/tests-trio/eth1-monitor/test_eth1_monitor.py index a71aad9e85..f51708ade3 100644 --- a/tests-trio/eth1-monitor/test_eth1_monitor.py +++ b/tests-trio/eth1-monitor/test_eth1_monitor.py @@ -10,6 +10,7 @@ from eth2._utils.merkle.common import verify_merkle_branch from trinity.components.eth2.eth1_monitor.eth1_monitor import ( make_deposit_tree_and_root, + GetDistanceRequest, GetEth1DataRequest, GetDepositRequest, Eth1Monitor, @@ -277,6 +278,8 @@ async def test_ipc( async def request(request_type, **kwargs): await endpoint_client.wait_until_any_endpoint_subscribed_to(request_type) resp = await endpoint_client.request(request_type(**kwargs), broadcast_config) + if request_type is GetDistanceRequest: + return resp return resp.to_data() # Result from IPC should be the same as the direct call with the same args. @@ -308,3 +311,27 @@ async def request(request_type, **kwargs): } with pytest.raises(Eth1MonitorValidationError): await request(GetEth1DataRequest, **get_eth1_data_kwargs_fails) + + # Test: `get_distance` + # Fast forward for a few blocks + tester.mine_blocks(5) + latest_block = w3.eth.getBlock("latest") + latest_confirmed_block_number = latest_block.number - num_blocks_confirmed + latest_confirmed_block = w3.eth.getBlock(latest_confirmed_block_number) + eth1_voting_period_start_timestamp = latest_confirmed_block["timestamp"] + for distance in range(latest_confirmed_block_number): + block = w3.eth.getBlock(latest_confirmed_block_number - distance) + get_distance_kwargs = { + "block_hash": block["hash"], + "eth1_voting_period_start_timestamp": eth1_voting_period_start_timestamp, + } + resp = await request(GetDistanceRequest, **get_distance_kwargs) + assert distance == resp.distance + assert resp.error is None + # Fails + get_distance_fail_kwargs = { + "block_hash": b'\x12' * 32, + "eth1_voting_period_start_timestamp": eth1_voting_period_start_timestamp, + } + resp = await request(GetDistanceRequest, **get_distance_fail_kwargs) + assert resp.error is not None From 8d6b7bb68361879b9214e0c840f7b46413944e1d Mon Sep 17 00:00:00 2001 From: NIC619 Date: Sun, 15 Dec 2019 23:20:20 +0800 Subject: [PATCH 11/18] Refactor `FakeEth1DataProvider` --- .../components/eth2/eth1_monitor/component.py | 20 +++- .../eth2/eth1_monitor/eth1_data_provider.py | 91 +++++++++++++++---- .../eth2/eth1_monitor/eth1_monitor.py | 26 +++--- 3 files changed, 101 insertions(+), 36 deletions(-) diff --git a/trinity/components/eth2/eth1_monitor/component.py b/trinity/components/eth2/eth1_monitor/component.py index 5e3bd5cad8..614f0d10cf 100644 --- a/trinity/components/eth2/eth1_monitor/component.py +++ b/trinity/components/eth2/eth1_monitor/component.py @@ -7,10 +7,13 @@ from async_service import Service, TrioManager +from eth_typing import BlockNumber + from lahja import EndpointAPI # from web3 import Web3 +from eth2.beacon.typing import Timestamp from trinity.components.eth2.eth1_monitor.configs import deposit_contract_json from trinity.components.eth2.eth1_monitor.eth1_data_provider import FakeEth1DataProvider from trinity.config import BeaconAppConfig @@ -30,11 +33,11 @@ DEPOSIT_CONTRACT_ADDRESS = b"\x12" * 20 NUM_BLOCKS_CONFIRMED = 100 POLLING_PERIOD = 10 -START_BLOCK_NUMBER = 1 +START_BLOCK_NUMBER = BlockNumber(1000) # Configs for fake Eth1DataProvider -NUM_DEPOSITS_PER_BLOCK = 5 -START_BLOCK_TIMESTAMP = int(time.time()) - 2100 # Around half an hour ago +NUM_DEPOSITS_PER_BLOCK = 0 +START_BLOCK_TIMESTAMP = Timestamp(int(time.time()) - 2100) # Around 45 mins ago class Eth1MonitorComponent(TrioIsolatedComponent): @@ -66,14 +69,21 @@ async def run(self) -> None: # w3: Web3 = Web3.HTTPProvider(self.boot_info.args.eth1client_rpc) # else: # w3: Web3 = None + beacon_app_config = trinity_config.get_app_config(BeaconAppConfig) + base_db = DBClient.connect(trinity_config.database_ipc_path) + chain_config = beacon_app_config.get_chain_config() + chain = chain_config.beacon_chain_class( + base_db, + chain_config.genesis_config + ) + state = chain.get_state_by_slot(chain_config.genesis_config.GENESIS_SLOT) fake_eth1_data_provider = FakeEth1DataProvider( start_block_number=START_BLOCK_NUMBER, start_block_timestamp=START_BLOCK_TIMESTAMP, num_deposits_per_block=NUM_DEPOSITS_PER_BLOCK, + num_initial_deposits=state.eth1_data.deposit_count, ) - base_db = DBClient.connect(trinity_config.database_ipc_path) - eth1_monitor_service: Service = Eth1Monitor( eth1_data_provider=fake_eth1_data_provider, num_blocks_confirmed=NUM_BLOCKS_CONFIRMED, diff --git a/trinity/components/eth2/eth1_monitor/eth1_data_provider.py b/trinity/components/eth2/eth1_monitor/eth1_data_provider.py index a2155bdabe..643d317d28 100644 --- a/trinity/components/eth2/eth1_monitor/eth1_data_provider.py +++ b/trinity/components/eth2/eth1_monitor/eth1_data_provider.py @@ -1,6 +1,7 @@ from abc import ABC, abstractmethod +import logging import time -from typing import Any, Dict, NamedTuple, Tuple, Union +from typing import Any, Dict, NamedTuple, Optional, Tuple, Union from eth_typing import Address, BLSPubkey, BLSSignature, BlockNumber, Hash32 @@ -11,6 +12,7 @@ from eth2._utils.hash import hash_eth2 from eth2.beacon.constants import GWEI_PER_ETH from eth2.beacon.typing import Gwei, Timestamp +from trinity.components.eth2.beacon.validator import ETH1_FOLLOW_DISTANCE class Eth1Block(NamedTuple): @@ -43,7 +45,7 @@ def from_contract_log_dict(cls, log: Dict[Any, Any]) -> "DepositLog": class BaseEth1DataProvider(ABC): @abstractmethod - def get_block(self, arg: Union[BlockNumber, str]) -> Eth1Block: + def get_block(self, arg: Union[Hash32, int, str]) -> Optional[Eth1Block]: ... @abstractmethod @@ -69,56 +71,101 @@ class FakeEth1DataProvider(BaseEth1DataProvider): num_deposits_per_block: int + num_initial_deposits: int + def __init__( self, start_block_number: BlockNumber, start_block_timestamp: Timestamp, num_deposits_per_block: int, + num_initial_deposits: int, ) -> None: self.start_block_number = start_block_number self.start_block_timestamp = start_block_timestamp self.num_deposits_per_block = num_deposits_per_block + self.num_initial_deposits = num_initial_deposits + + @property + def is_fake_provider(self) -> bool: + return True - def get_block(self, arg: Union[BlockNumber, str]) -> Eth1Block: + def _get_latest_block_number(self) -> BlockNumber: + current_time = int(time.time()) + num_blocks_inbetween = (current_time - self.start_block_timestamp) // AVERAGE_BLOCK_TIME + return BlockNumber(self.start_block_number + num_blocks_inbetween) + + def _get_block_time(self, block_number: BlockNumber) -> Timestamp: + return Timestamp( + self.start_block_timestamp + + (block_number - self.start_block_number) * AVERAGE_BLOCK_TIME + ) + + def get_block(self, arg: Union[Hash32, int, str]) -> Optional[Eth1Block]: + # If `arg` is block number if isinstance(arg, int): - block_time = ( - self.start_block_timestamp + (arg - self.start_block_number) * AVERAGE_BLOCK_TIME - ) + block_time = self._get_block_time(BlockNumber(arg)) return Eth1Block( - block_hash=hash_eth2(int(arg).to_bytes(4, byteorder='big')), + block_hash=int(arg).to_bytes(32, byteorder='big'), number=arg, timestamp=Timestamp(block_time), ) + # If `arg` is block hash + elif isinstance(arg, bytes): + block_number = int.from_bytes(arg, byteorder='big') + latest_block_number = self._get_latest_block_number() + # Check if provided block number is in valid range + earliest_follow_block_number = self.start_block_number - ETH1_FOLLOW_DISTANCE + is_beyond_follow_distance = block_number < earliest_follow_block_number + if (is_beyond_follow_distance or block_number > latest_block_number): + # If provided block number does not make sense, + # assume it's the block at `earliest_follow_block_number`. + return Eth1Block( + block_hash=earliest_follow_block_number.to_bytes(32, byteorder='big'), + number=BlockNumber(earliest_follow_block_number), + timestamp=Timestamp( + self.start_block_timestamp - ETH1_FOLLOW_DISTANCE * AVERAGE_BLOCK_TIME, + ), + ) + block_time = self._get_block_time(block_number) + return Eth1Block( + block_hash=arg, + number=BlockNumber(block_number), + timestamp=Timestamp(block_time), + ) else: # Assume `arg` == 'latest' - current_time = int(time.time()) - num_blocks_inbetween = (current_time - self.start_block_timestamp) // AVERAGE_BLOCK_TIME - block_number = self.start_block_number + num_blocks_inbetween + latest_block_number = self._get_latest_block_number() + block_time = self._get_block_time(latest_block_number) return Eth1Block( - block_hash=hash_eth2(block_number.to_bytes(4, byteorder='big')), - number=BlockNumber(block_number), - timestamp=Timestamp(current_time - (current_time % AVERAGE_BLOCK_TIME)), + block_hash=latest_block_number.to_bytes(32, byteorder='big'), + number=BlockNumber(latest_block_number), + timestamp=block_time, ) def get_logs(self, block_number: BlockNumber) -> Tuple[DepositLog, ...]: logs = [] for _ in range(self.num_deposits_per_block): log = DepositLog( - block_hash=hash_eth2(block_number.to_bytes(4, byteorder='big')), + block_hash=block_number.to_bytes(32, byteorder='big'), pubkey=BLSPubkey(b'\x12' * 48), withdrawal_credentials=Hash32(b'\x23' * 32), signature=BLSSignature(b'\x34' * 96), - amount=32 * GWEI_PER_ETH, + amount=Gwei(32 * GWEI_PER_ETH), ) logs.append(log) return tuple(logs) def get_deposit_count(self, block_number: BlockNumber) -> bytes: - deposit_count = self.num_deposits_per_block * (block_number - self.start_block_number) - return deposit_count.to_bytes(4, byteorder='big') + if block_number <= self.start_block_number: + return self.num_initial_deposits.to_bytes(32, byteorder='little') + deposit_count = ( + self.num_initial_deposits + + (block_number - self.start_block_number) * self.num_deposits_per_block + ) + return deposit_count.to_bytes(32, byteorder='little') def get_deposit_root(self, block_number: BlockNumber) -> Hash32: - block_root = hash_eth2(block_number.to_bytes(4, byteorder='big')) + block_root = block_number.to_bytes(32, byteorder='big') return hash_eth2( block_root + self.get_deposit_count(block_number) ) @@ -149,8 +196,14 @@ def __init__( event_abi_to_log_topic(self._deposit_event_abi) ) - def get_block(self, arg: Union[int, str]) -> Eth1Block: + @property + def is_fake_provider(self) -> bool: + return False + + def get_block(self, arg: Union[Hash32, int, str]) -> Optional[Eth1Block]: block_dict = self.w3.eth.getBlock(arg) + if block_dict is None: + return None return Eth1Block( block_hash=Hash32(block_dict["hash"]), number=BlockNumber(block_dict["number"]), diff --git a/trinity/components/eth2/eth1_monitor/eth1_monitor.py b/trinity/components/eth2/eth1_monitor/eth1_monitor.py index d4369061c3..34b48d8871 100644 --- a/trinity/components/eth2/eth1_monitor/eth1_monitor.py +++ b/trinity/components/eth2/eth1_monitor/eth1_monitor.py @@ -20,7 +20,8 @@ from web3 import Web3 from eth.abc import AtomicDatabaseAPI -from eth_utils import encode_hex + +from eth_utils import humanize_hash from eth2.beacon.typing import Timestamp from eth2.beacon.types.deposits import Deposit @@ -61,7 +62,6 @@ def _w3_get_block(w3: Web3, *args: Any, **kwargs: Any) -> Eth1Block: class Eth1Monitor(Service): - logger = logging.getLogger('trinity.Eth1Monitor') _eth1_data_provider: BaseEth1DataProvider @@ -129,7 +129,7 @@ def _handle_get_distance(self, req: GetDistanceRequest) -> GetDistanceResponse: block = self._eth1_data_provider.get_block(req.block_hash) if block is None: raise Eth1MonitorValidationError( - f"Block does not exist for block_hash={req.block_hash}" + f"Block does not exist for block_hash={humanize_hash(req.block_hash)}" ) eth1_voting_period_start_block_number = self._get_closest_eth1_voting_period_start_block( req.eth1_voting_period_start_timestamp, @@ -205,6 +205,7 @@ def _get_eth1_data( raise Eth1MonitorValidationError( f"failed to make `Eth1Data`: `deposit_count = 0` at block #{target_block_number}" ) + # Verify that the deposit data in db and the deposit data in contract match deposit_data_in_range = self._db.get_deposit_data_range( 0, accumulated_deposit_count ) @@ -212,14 +213,16 @@ def _get_eth1_data( contract_deposit_root = self._get_deposit_root_from_contract( target_block_number ) - if contract_deposit_root != deposit_root: - raise DepositDataCorrupted( - "deposit root built locally mismatches the one in the contract on chain: " - f"contract_deposit_root={contract_deposit_root.hex()}, " - f"deposit_root={deposit_root.hex()}" - ) + # TODO: Remove this if we no longer need a fake provider + if not self._eth1_data_provider.is_fake_provider: + if contract_deposit_root != deposit_root: + raise DepositDataCorrupted( + "deposit root built locally mismatches the one in the contract on chain: " + f"contract_deposit_root={contract_deposit_root.hex()}, " + f"deposit_root={deposit_root.hex()}" + ) return Eth1Data( - deposit_root=deposit_root, + deposit_root=contract_deposit_root, deposit_count=accumulated_deposit_count, block_hash=block_hash, ) @@ -255,11 +258,10 @@ async def _run_handle_request( self, event_type: Type[TRequest], event_handler: Callable[[TRequest], Any] ) -> None: async for req in self._event_bus.stream(event_type): - self.logger.info("Monitor receive deposit data request: %s", req) try: resp = event_handler(req) except Exception as e: - if event_type is GetDistanceRequest: + if isinstance(req, GetDistanceRequest): await self._event_bus.broadcast( req.expected_response_type()(None, e), req.broadcast_config() ) From 35d7d78ad718877d913b646319b9ddc050e82ef2 Mon Sep 17 00:00:00 2001 From: NIC619 Date: Mon, 16 Dec 2019 15:20:13 +0800 Subject: [PATCH 12/18] Update `Eth1MonitorComponent` api --- .../components/eth2/eth1_monitor/component.py | 72 ++++++++++--------- 1 file changed, 37 insertions(+), 35 deletions(-) diff --git a/trinity/components/eth2/eth1_monitor/component.py b/trinity/components/eth2/eth1_monitor/component.py index 614f0d10cf..77d26e12c9 100644 --- a/trinity/components/eth2/eth1_monitor/component.py +++ b/trinity/components/eth2/eth1_monitor/component.py @@ -14,6 +14,7 @@ # from web3 import Web3 from eth2.beacon.typing import Timestamp +from trinity.boot_info import BootInfo from trinity.components.eth2.eth1_monitor.configs import deposit_contract_json from trinity.components.eth2.eth1_monitor.eth1_data_provider import FakeEth1DataProvider from trinity.config import BeaconAppConfig @@ -42,13 +43,12 @@ class Eth1MonitorComponent(TrioIsolatedComponent): - @property - def name(self) -> str: - return "Eth1 Monitor" + name = "Eth1 Monitor" + endpoint_name = "eth1-monitor" - def on_ready(self, manager_eventbus: EndpointAPI) -> None: - if self.boot_info.trinity_config.has_app_config(BeaconAppConfig): - self.start() + @property + def is_enabled(self) -> bool: + return self._boot_info.trinity_config.has_app_config(BeaconAppConfig) @classmethod def configure_parser(cls, @@ -61,8 +61,9 @@ def configure_parser(cls, # help="RPC HTTP endpoint of Eth1 client ", # ) - async def run(self) -> None: - trinity_config = self.boot_info.trinity_config + @classmethod + async def do_run(cls, boot_info: BootInfo, event_bus: EndpointAPI) -> None: + trinity_config = boot_info.trinity_config # TODO: For now we use fake eth1 monitor. # if self.boot_info.args.eth1client_rpc: @@ -71,30 +72,31 @@ async def run(self) -> None: # w3: Web3 = None beacon_app_config = trinity_config.get_app_config(BeaconAppConfig) base_db = DBClient.connect(trinity_config.database_ipc_path) - chain_config = beacon_app_config.get_chain_config() - chain = chain_config.beacon_chain_class( - base_db, - chain_config.genesis_config - ) - state = chain.get_state_by_slot(chain_config.genesis_config.GENESIS_SLOT) - fake_eth1_data_provider = FakeEth1DataProvider( - start_block_number=START_BLOCK_NUMBER, - start_block_timestamp=START_BLOCK_TIMESTAMP, - num_deposits_per_block=NUM_DEPOSITS_PER_BLOCK, - num_initial_deposits=state.eth1_data.deposit_count, - ) - - eth1_monitor_service: Service = Eth1Monitor( - eth1_data_provider=fake_eth1_data_provider, - num_blocks_confirmed=NUM_BLOCKS_CONFIRMED, - polling_period=POLLING_PERIOD, - start_block_number=START_BLOCK_NUMBER, - event_bus=self.event_bus, - base_db=base_db, - ) - - try: - await TrioManager.run_service(eth1_monitor_service) - except Exception: - await self.event_bus.broadcast(ShutdownRequest("Eth1 Monitor ended unexpectedly")) - raise + with base_db: + chain_config = beacon_app_config.get_chain_config() + chain = chain_config.beacon_chain_class( + base_db, + chain_config.genesis_config, + ) + state = chain.get_state_by_slot(chain_config.genesis_config.GENESIS_SLOT) + fake_eth1_data_provider = FakeEth1DataProvider( + start_block_number=START_BLOCK_NUMBER, + start_block_timestamp=START_BLOCK_TIMESTAMP, + num_deposits_per_block=NUM_DEPOSITS_PER_BLOCK, + num_initial_deposits=state.eth1_data.deposit_count, + ) + + eth1_monitor_service: Service = Eth1Monitor( + eth1_data_provider=fake_eth1_data_provider, + num_blocks_confirmed=NUM_BLOCKS_CONFIRMED, + polling_period=POLLING_PERIOD, + start_block_number=START_BLOCK_NUMBER, + event_bus=event_bus, + base_db=base_db, + ) + + try: + await TrioManager.run_service(eth1_monitor_service) + except Exception: + await event_bus.broadcast(ShutdownRequest("Eth1 Monitor ended unexpectedly")) + raise From 97f04542aaa8a657cdc0f7ecf0f88feeb6704404 Mon Sep 17 00:00:00 2001 From: NIC619 Date: Mon, 16 Dec 2019 17:49:55 +0800 Subject: [PATCH 13/18] Hardcode initial deposit into `FakeEth1DataProvider` --- .../components/eth2/eth1_monitor/component.py | 53 ++++++++++++++----- .../eth2/eth1_monitor/eth1_data_provider.py | 25 +++++++-- .../eth2/eth1_monitor/eth1_monitor.py | 2 +- 3 files changed, 64 insertions(+), 16 deletions(-) diff --git a/trinity/components/eth2/eth1_monitor/component.py b/trinity/components/eth2/eth1_monitor/component.py index 77d26e12c9..7ff50e48b8 100644 --- a/trinity/components/eth2/eth1_monitor/component.py +++ b/trinity/components/eth2/eth1_monitor/component.py @@ -3,16 +3,21 @@ _SubParsersAction, ) import json +from pathlib import Path import time from async_service import Service, TrioManager from eth_typing import BlockNumber +from eth_utils import decode_hex from lahja import EndpointAPI # from web3 import Web3 +from eth2._utils.hash import hash_eth2 +from eth2.beacon.tools.builder.validator import create_mock_deposit_data +from eth2.beacon.tools.fixtures.loading import load_yaml_at from eth2.beacon.typing import Timestamp from trinity.boot_info import BootInfo from trinity.components.eth2.eth1_monitor.configs import deposit_contract_json @@ -24,7 +29,6 @@ TrioIsolatedComponent, ) - from .eth1_monitor import Eth1Monitor @@ -64,26 +68,51 @@ def configure_parser(cls, @classmethod async def do_run(cls, boot_info: BootInfo, event_bus: EndpointAPI) -> None: trinity_config = boot_info.trinity_config + beacon_app_config = trinity_config.get_app_config(BeaconAppConfig) + chain_config = beacon_app_config.get_chain_config() + base_db = DBClient.connect(trinity_config.database_ipc_path) # TODO: For now we use fake eth1 monitor. - # if self.boot_info.args.eth1client_rpc: - # w3: Web3 = Web3.HTTPProvider(self.boot_info.args.eth1client_rpc) + # if boot_info.args.eth1client_rpc: + # w3: Web3 = Web3.HTTPProvider(boot_info.args.eth1client_rpc) # else: # w3: Web3 = None - beacon_app_config = trinity_config.get_app_config(BeaconAppConfig) - base_db = DBClient.connect(trinity_config.database_ipc_path) - with base_db: - chain_config = beacon_app_config.get_chain_config() - chain = chain_config.beacon_chain_class( - base_db, - chain_config.genesis_config, + + # TODO: For now we use fake eth1 monitor. So we load validators data from + # interop setting and hardcode the deposit data into fake eth1 data provider. + chain = chain_config.beacon_chain_class( + base_db, + chain_config.genesis_config, + ) + config = chain.get_state_machine().config + # Below code snippet is copied from generate_beacon_genesis.py + key_set = load_yaml_at( + Path('eth2/beacon/scripts/quickstart_state/keygen_16_validators.yaml') + ) + initial_deposits = () + for key_pair in key_set: + pubkey = decode_hex(key_pair["pubkey"]) + privkey = int.from_bytes(decode_hex(key_pair["privkey"]), "big") + withdrawal_credential = ( + config.BLS_WITHDRAWAL_PREFIX.to_bytes(1, byteorder="big") + + hash_eth2(pubkey)[1:] + ) + + deposit_data = create_mock_deposit_data( + config=config, + pubkey=pubkey, + privkey=privkey, + withdrawal_credentials=withdrawal_credential, ) - state = chain.get_state_by_slot(chain_config.genesis_config.GENESIS_SLOT) + + initial_deposits += (deposit_data,) + + with base_db: fake_eth1_data_provider = FakeEth1DataProvider( start_block_number=START_BLOCK_NUMBER, start_block_timestamp=START_BLOCK_TIMESTAMP, num_deposits_per_block=NUM_DEPOSITS_PER_BLOCK, - num_initial_deposits=state.eth1_data.deposit_count, + initial_deposits=initial_deposits, ) eth1_monitor_service: Service = Eth1Monitor( diff --git a/trinity/components/eth2/eth1_monitor/eth1_data_provider.py b/trinity/components/eth2/eth1_monitor/eth1_data_provider.py index 643d317d28..02d3e338fc 100644 --- a/trinity/components/eth2/eth1_monitor/eth1_data_provider.py +++ b/trinity/components/eth2/eth1_monitor/eth1_data_provider.py @@ -11,6 +11,7 @@ from eth2._utils.hash import hash_eth2 from eth2.beacon.constants import GWEI_PER_ETH +from eth2.beacon.types.deposit_data import DepositData from eth2.beacon.typing import Gwei, Timestamp from trinity.components.eth2.beacon.validator import ETH1_FOLLOW_DISTANCE @@ -71,6 +72,7 @@ class FakeEth1DataProvider(BaseEth1DataProvider): num_deposits_per_block: int + initial_deposits: Tuple[DepositData, ...] num_initial_deposits: int def __init__( @@ -78,13 +80,15 @@ def __init__( start_block_number: BlockNumber, start_block_timestamp: Timestamp, num_deposits_per_block: int, - num_initial_deposits: int, + initial_deposits: Tuple[DepositData, ...], ) -> None: self.start_block_number = start_block_number self.start_block_timestamp = start_block_timestamp self.num_deposits_per_block = num_deposits_per_block - self.num_initial_deposits = num_initial_deposits + self.initial_deposits = initial_deposits + self.num_initial_deposits = len(initial_deposits) + # TODO: Remove this if we no longer need a fake provider @property def is_fake_provider(self) -> bool: return True @@ -143,10 +147,24 @@ def get_block(self, arg: Union[Hash32, int, str]) -> Optional[Eth1Block]: ) def get_logs(self, block_number: BlockNumber) -> Tuple[DepositLog, ...]: + block_hash = block_number.to_bytes(32, byteorder='big') + if block_number == self.start_block_number: + logs = [ + DepositLog( + block_hash=block_hash, + pubkey=deposit.pubkey, + withdrawal_credentials=deposit.withdrawal_credentials, + signature=deposit.signature, + amount=deposit.amount, + ) + for deposit in self.initial_deposits + ] + return logs + logs = [] for _ in range(self.num_deposits_per_block): log = DepositLog( - block_hash=block_number.to_bytes(32, byteorder='big'), + block_hash=block_hash, pubkey=BLSPubkey(b'\x12' * 48), withdrawal_credentials=Hash32(b'\x23' * 32), signature=BLSSignature(b'\x34' * 96), @@ -196,6 +214,7 @@ def __init__( event_abi_to_log_topic(self._deposit_event_abi) ) + # TODO: Remove this if we no longer need a fake provider @property def is_fake_provider(self) -> bool: return False diff --git a/trinity/components/eth2/eth1_monitor/eth1_monitor.py b/trinity/components/eth2/eth1_monitor/eth1_monitor.py index 34b48d8871..d31e57ecc6 100644 --- a/trinity/components/eth2/eth1_monitor/eth1_monitor.py +++ b/trinity/components/eth2/eth1_monitor/eth1_monitor.py @@ -222,7 +222,7 @@ def _get_eth1_data( f"deposit_root={deposit_root.hex()}" ) return Eth1Data( - deposit_root=contract_deposit_root, + deposit_root=deposit_root, deposit_count=accumulated_deposit_count, block_hash=block_hash, ) From 0dead1e8cbabf32060ec59c1a7b057a310d57051 Mon Sep 17 00:00:00 2001 From: NIC619 Date: Tue, 17 Dec 2019 15:23:06 +0800 Subject: [PATCH 14/18] Apply PR feedback: update FakeEth1DataProvider --- .../eth2/eth1_monitor/eth1_data_provider.py | 173 +++++++++--------- 1 file changed, 87 insertions(+), 86 deletions(-) diff --git a/trinity/components/eth2/eth1_monitor/eth1_data_provider.py b/trinity/components/eth2/eth1_monitor/eth1_data_provider.py index 02d3e338fc..518315b55c 100644 --- a/trinity/components/eth2/eth1_monitor/eth1_data_provider.py +++ b/trinity/components/eth2/eth1_monitor/eth1_data_provider.py @@ -62,6 +62,77 @@ def get_deposit_root(self, block_number: BlockNumber) -> Hash32: ... +class Web3Eth1DataProvider(BaseEth1DataProvider): + + w3: Web3 + + _deposit_contract: "Web3.eth.contract" + _deposit_event_abi: Dict[str, Any] + _deposit_event_topic: str + + def __init__( + self, + w3: Web3, + deposit_contract_address: Address, + deposit_contract_abi: Dict[str, Any], + ) -> None: + self.w3 = w3 + self._deposit_contract = self.w3.eth.contract( + address=deposit_contract_address, abi=deposit_contract_abi + ) + self._deposit_event_abi = ( + self._deposit_contract.events.DepositEvent._get_event_abi() + ) + self._deposit_event_topic = encode_hex( + event_abi_to_log_topic(self._deposit_event_abi) + ) + + # TODO: Remove this if we no longer need a fake provider + @property + def is_fake_provider(self) -> bool: + return False + + def get_block(self, arg: Union[Hash32, int, str]) -> Optional[Eth1Block]: + block_dict = self.w3.eth.getBlock(arg) + if block_dict is None: + return None + return Eth1Block( + block_hash=Hash32(block_dict["hash"]), + number=BlockNumber(block_dict["number"]), + timestamp=Timestamp(block_dict["timestamp"]), + ) + + def get_logs(self, block_number: BlockNumber) -> Tuple[DepositLog, ...]: + # NOTE: web3 v4 does not support `contract.events.Event.getLogs`. + # After upgrading to v5, we can change to use the function. + logs = self.w3.eth.getLogs( + { + "fromBlock": block_number, + "toBlock": block_number, + "address": self._deposit_contract.address, + "topics": [self._deposit_event_topic], + } + ) + parsed_logs = tuple( + DepositLog.from_contract_log_dict( + get_event_data(self._deposit_event_abi, log) + ) + for log in logs + ) + return parsed_logs + + def get_deposit_count(self, block_number: BlockNumber) -> bytes: + return self._deposit_contract.functions.get_deposit_count().call( + block_identifier=block_number + ) + + def get_deposit_root(self, block_number: BlockNumber) -> Hash32: + return self._deposit_contract.functions.get_deposit_root().call( + block_identifier=block_number + ) + + +# NOTE: This constant is for `FakeEth1DataProvider` AVERAGE_BLOCK_TIME = 20 @@ -95,8 +166,8 @@ def is_fake_provider(self) -> bool: def _get_latest_block_number(self) -> BlockNumber: current_time = int(time.time()) - num_blocks_inbetween = (current_time - self.start_block_timestamp) // AVERAGE_BLOCK_TIME - return BlockNumber(self.start_block_number + num_blocks_inbetween) + distance = (current_time - self.start_block_timestamp) // AVERAGE_BLOCK_TIME + return BlockNumber(self.start_block_number + distance) def _get_block_time(self, block_number: BlockNumber) -> Timestamp: return Timestamp( @@ -149,7 +220,7 @@ def get_block(self, arg: Union[Hash32, int, str]) -> Optional[Eth1Block]: def get_logs(self, block_number: BlockNumber) -> Tuple[DepositLog, ...]: block_hash = block_number.to_bytes(32, byteorder='big') if block_number == self.start_block_number: - logs = [ + logs = ( DepositLog( block_hash=block_hash, pubkey=deposit.pubkey, @@ -158,20 +229,20 @@ def get_logs(self, block_number: BlockNumber) -> Tuple[DepositLog, ...]: amount=deposit.amount, ) for deposit in self.initial_deposits - ] - return logs - - logs = [] - for _ in range(self.num_deposits_per_block): - log = DepositLog( - block_hash=block_hash, - pubkey=BLSPubkey(b'\x12' * 48), - withdrawal_credentials=Hash32(b'\x23' * 32), - signature=BLSSignature(b'\x34' * 96), - amount=Gwei(32 * GWEI_PER_ETH), ) - logs.append(log) - return tuple(logs) + return tuple(logs) + else: + logs = ( + DepositLog( + block_hash=block_hash, + pubkey=BLSPubkey(b'\x12' * 48), + withdrawal_credentials=Hash32(b'\x23' * 32), + signature=BLSSignature(b'\x34' * 96), + amount=Gwei(32 * GWEI_PER_ETH), + ) + for _ in range(self.num_deposits_per_block) + ) + return tuple(logs) def get_deposit_count(self, block_number: BlockNumber) -> bytes: if block_number <= self.start_block_number: @@ -187,73 +258,3 @@ def get_deposit_root(self, block_number: BlockNumber) -> Hash32: return hash_eth2( block_root + self.get_deposit_count(block_number) ) - - -class Web3Eth1DataProvider(BaseEth1DataProvider): - - w3: Web3 - - _deposit_contract: "Web3.eth.contract" - _deposit_event_abi: Dict[str, Any] - _deposit_event_topic: str - - def __init__( - self, - w3: Web3, - deposit_contract_address: Address, - deposit_contract_abi: Dict[str, Any], - ) -> None: - self.w3 = w3 - self._deposit_contract = self.w3.eth.contract( - address=deposit_contract_address, abi=deposit_contract_abi - ) - self._deposit_event_abi = ( - self._deposit_contract.events.DepositEvent._get_event_abi() - ) - self._deposit_event_topic = encode_hex( - event_abi_to_log_topic(self._deposit_event_abi) - ) - - # TODO: Remove this if we no longer need a fake provider - @property - def is_fake_provider(self) -> bool: - return False - - def get_block(self, arg: Union[Hash32, int, str]) -> Optional[Eth1Block]: - block_dict = self.w3.eth.getBlock(arg) - if block_dict is None: - return None - return Eth1Block( - block_hash=Hash32(block_dict["hash"]), - number=BlockNumber(block_dict["number"]), - timestamp=Timestamp(block_dict["timestamp"]), - ) - - def get_logs(self, block_number: BlockNumber) -> Tuple[DepositLog, ...]: - # NOTE: web3 v4 does not support `contract.events.Event.getLogs`. - # After upgrading to v5, we can change to use the function. - logs = self.w3.eth.getLogs( - { - "fromBlock": block_number, - "toBlock": block_number, - "address": self._deposit_contract.address, - "topics": [self._deposit_event_topic], - } - ) - parsed_logs = tuple( - DepositLog.from_contract_log_dict( - get_event_data(self._deposit_event_abi, log) - ) - for log in logs - ) - return parsed_logs - - def get_deposit_count(self, block_number: BlockNumber) -> bytes: - return self._deposit_contract.functions.get_deposit_count().call( - block_identifier=block_number - ) - - def get_deposit_root(self, block_number: BlockNumber) -> Hash32: - return self._deposit_contract.functions.get_deposit_root().call( - block_identifier=block_number - ) From b338b5bc60fc1992b6838d7b44a8441f59902362 Mon Sep 17 00:00:00 2001 From: NIC619 Date: Tue, 17 Dec 2019 15:23:21 +0800 Subject: [PATCH 15/18] Apply PR feedback: refactor create keypair and mock withdrawal credential part --- .../generate_beacon_genesis.py | 31 +++++++------------ eth2/beacon/tools/builder/initializer.py | 24 +++++++++++++- .../components/eth2/eth1_monitor/component.py | 29 ++++++++--------- 3 files changed, 48 insertions(+), 36 deletions(-) diff --git a/eth2/beacon/scripts/quickstart_state/generate_beacon_genesis.py b/eth2/beacon/scripts/quickstart_state/generate_beacon_genesis.py index ec6288a4ca..385d3cff83 100644 --- a/eth2/beacon/scripts/quickstart_state/generate_beacon_genesis.py +++ b/eth2/beacon/scripts/quickstart_state/generate_beacon_genesis.py @@ -1,12 +1,13 @@ from pathlib import Path import time -from eth_utils import decode_hex import ssz -from eth2._utils.hash import hash_eth2 from eth2.beacon.genesis import initialize_beacon_state_from_eth1 -from eth2.beacon.tools.builder.initializer import create_mock_deposits_and_root +from eth2.beacon.tools.builder.initializer import ( + create_keypair_and_mock_withdraw_credentials, + create_mock_deposits_and_root, +) from eth2.beacon.tools.fixtures.config_types import Minimal from eth2.beacon.tools.fixtures.loading import load_config_at_path, load_yaml_at from eth2.beacon.tools.misc.ssz_vector import override_lengths @@ -30,22 +31,14 @@ def _main(): key_set = load_yaml_at(KEY_DIR / KEY_SET_FILE) - pubkeys = () - privkeys = () - withdrawal_credentials = () - keymap = {} - for key_pair in key_set: - pubkey = decode_hex(key_pair["pubkey"]) - privkey = int.from_bytes(decode_hex(key_pair["privkey"]), "big") - withdrawal_credential = ( - config.BLS_WITHDRAWAL_PREFIX.to_bytes(1, byteorder="big") - + hash_eth2(pubkey)[1:] - ) - - pubkeys += (pubkey,) - privkeys += (privkey,) - withdrawal_credentials += (withdrawal_credential,) - keymap[pubkey] = privkey + pubkeys, privkeys, withdrawal_credentials = create_keypair_and_mock_withdraw_credentials( + config, + key_set, + ) + keymap = { + pubkey: privkey + for pubkey, privkey in zip(pubkeys, privkeys) + } deposits, _ = create_mock_deposits_and_root( pubkeys, keymap, config, withdrawal_credentials diff --git a/eth2/beacon/tools/builder/initializer.py b/eth2/beacon/tools/builder/initializer.py index f03f233bbb..7ce0726c5c 100644 --- a/eth2/beacon/tools/builder/initializer.py +++ b/eth2/beacon/tools/builder/initializer.py @@ -1,7 +1,8 @@ -from typing import Dict, Sequence, Tuple, Type +from typing import Any, Dict, Sequence, Tuple, Type from eth.constants import ZERO_HASH32 from eth_typing import BLSPubkey, Hash32 +from eth_utils import decode_hex from py_ecc.optimized_bls12_381.optimized_curve import ( curve_order as BLS12_381_CURVE_ORDER, ) @@ -32,6 +33,27 @@ def generate_privkey_from_index(index: int) -> int: ) +def create_keypair_and_mock_withdraw_credentials( + config: Eth2Config, + key_set: Dict[str, Any], +) -> Tuple[Sequence[BLSPubkey], Sequence[int], Sequence[Hash32]]: + pubkeys = () + privkeys = () + withdrawal_credentials = () + for key_pair in key_set: + pubkey = decode_hex(key_pair["pubkey"]) + privkey = int.from_bytes(decode_hex(key_pair["privkey"]), "big") + withdrawal_credential = ( + config.BLS_WITHDRAWAL_PREFIX.to_bytes(1, byteorder="big") + + hash_eth2(pubkey)[1:] + ) + + pubkeys += (pubkey,) + privkeys += (privkey,) + withdrawal_credentials += (withdrawal_credential,) + return (pubkeys, privkeys, withdrawal_credentials) + + def create_mock_deposits_and_root( pubkeys: Sequence[BLSPubkey], keymap: Dict[BLSPubkey, int], diff --git a/trinity/components/eth2/eth1_monitor/component.py b/trinity/components/eth2/eth1_monitor/component.py index 7ff50e48b8..96c4ca3fff 100644 --- a/trinity/components/eth2/eth1_monitor/component.py +++ b/trinity/components/eth2/eth1_monitor/component.py @@ -9,13 +9,14 @@ from async_service import Service, TrioManager from eth_typing import BlockNumber -from eth_utils import decode_hex from lahja import EndpointAPI # from web3 import Web3 -from eth2._utils.hash import hash_eth2 +from eth2.beacon.tools.builder.initializer import ( + create_keypair_and_mock_withdraw_credentials, +) from eth2.beacon.tools.builder.validator import create_mock_deposit_data from eth2.beacon.tools.fixtures.loading import load_yaml_at from eth2.beacon.typing import Timestamp @@ -85,34 +86,30 @@ async def do_run(cls, boot_info: BootInfo, event_bus: EndpointAPI) -> None: chain_config.genesis_config, ) config = chain.get_state_machine().config - # Below code snippet is copied from generate_beacon_genesis.py key_set = load_yaml_at( Path('eth2/beacon/scripts/quickstart_state/keygen_16_validators.yaml') ) - initial_deposits = () - for key_pair in key_set: - pubkey = decode_hex(key_pair["pubkey"]) - privkey = int.from_bytes(decode_hex(key_pair["privkey"]), "big") - withdrawal_credential = ( - config.BLS_WITHDRAWAL_PREFIX.to_bytes(1, byteorder="big") - + hash_eth2(pubkey)[1:] - ) - - deposit_data = create_mock_deposit_data( + pubkeys, privkeys, withdrawal_credentials = create_keypair_and_mock_withdraw_credentials( + config, + key_set, + ) + initial_deposits = ( + create_mock_deposit_data( config=config, pubkey=pubkey, privkey=privkey, withdrawal_credentials=withdrawal_credential, ) - - initial_deposits += (deposit_data,) + for pubkey, privkey, withdrawal_credential in zip( + pubkeys, privkeys, withdrawal_credentials) + ) with base_db: fake_eth1_data_provider = FakeEth1DataProvider( start_block_number=START_BLOCK_NUMBER, start_block_timestamp=START_BLOCK_TIMESTAMP, num_deposits_per_block=NUM_DEPOSITS_PER_BLOCK, - initial_deposits=initial_deposits, + initial_deposits=tuple(initial_deposits), ) eth1_monitor_service: Service = Eth1Monitor( From 87093be7be3386cdbde814ad46ecab5729a59811 Mon Sep 17 00:00:00 2001 From: NIC619 Date: Tue, 17 Dec 2019 15:57:54 +0800 Subject: [PATCH 16/18] Also store deposit data in fake eth1 data provider --- .../eth2/eth1_monitor/eth1_data_provider.py | 47 ++++++++++++------- .../eth2/eth1_monitor/eth1_monitor.py | 14 +++--- 2 files changed, 35 insertions(+), 26 deletions(-) diff --git a/trinity/components/eth2/eth1_monitor/eth1_data_provider.py b/trinity/components/eth2/eth1_monitor/eth1_data_provider.py index 518315b55c..1aac85051f 100644 --- a/trinity/components/eth2/eth1_monitor/eth1_data_provider.py +++ b/trinity/components/eth2/eth1_monitor/eth1_data_provider.py @@ -9,8 +9,8 @@ from web3 import Web3 from web3.utils.events import get_event_data -from eth2._utils.hash import hash_eth2 from eth2.beacon.constants import GWEI_PER_ETH +from eth2.beacon.tools.builder.validator import make_deposit_tree_and_root from eth2.beacon.types.deposit_data import DepositData from eth2.beacon.typing import Gwei, Timestamp from trinity.components.eth2.beacon.validator import ETH1_FOLLOW_DISTANCE @@ -43,6 +43,15 @@ def from_contract_log_dict(cls, log: Dict[Any, Any]) -> "DepositLog": ) +def convert_deposit_log_to_deposit_data(deposit_log: DepositLog) -> DepositData: + return DepositData( + pubkey=deposit_log.pubkey, + withdrawal_credentials=deposit_log.withdrawal_credentials, + amount=deposit_log.amount, + signature=deposit_log.signature, + ) + + class BaseEth1DataProvider(ABC): @abstractmethod @@ -87,11 +96,6 @@ def __init__( event_abi_to_log_topic(self._deposit_event_abi) ) - # TODO: Remove this if we no longer need a fake provider - @property - def is_fake_provider(self) -> bool: - return False - def get_block(self, arg: Union[Hash32, int, str]) -> Optional[Eth1Block]: block_dict = self.w3.eth.getBlock(arg) if block_dict is None: @@ -143,8 +147,9 @@ class FakeEth1DataProvider(BaseEth1DataProvider): num_deposits_per_block: int - initial_deposits: Tuple[DepositData, ...] + deposits: Tuple[DepositData, ...] num_initial_deposits: int + latest_processed_block_number: BlockNumber def __init__( self, @@ -156,13 +161,9 @@ def __init__( self.start_block_number = start_block_number self.start_block_timestamp = start_block_timestamp self.num_deposits_per_block = num_deposits_per_block - self.initial_deposits = initial_deposits + self.deposits = initial_deposits self.num_initial_deposits = len(initial_deposits) - - # TODO: Remove this if we no longer need a fake provider - @property - def is_fake_provider(self) -> bool: - return True + self.latest_processed_block_number = start_block_number def _get_latest_block_number(self) -> BlockNumber: current_time = int(time.time()) @@ -228,7 +229,7 @@ def get_logs(self, block_number: BlockNumber) -> Tuple[DepositLog, ...]: signature=deposit.signature, amount=deposit.amount, ) - for deposit in self.initial_deposits + for deposit in self.deposits ) return tuple(logs) else: @@ -254,7 +255,17 @@ def get_deposit_count(self, block_number: BlockNumber) -> bytes: return deposit_count.to_bytes(32, byteorder='little') def get_deposit_root(self, block_number: BlockNumber) -> Hash32: - block_root = block_number.to_bytes(32, byteorder='big') - return hash_eth2( - block_root + self.get_deposit_count(block_number) - ) + # Check and update deposit data when deposit root is requested + if self.latest_processed_block_number < block_number: + for blk_number in range(self.latest_processed_block_number + 1, block_number + 1): + deposit_logs = self.get_logs(blk_number) + self.deposits += tuple( + convert_deposit_log_to_deposit_data(deposit_log) + for deposit_log in deposit_logs + ) + self.latest_processed_block_number = block_number + deposit_count_bytes = self.get_deposit_count(block_number) + deposit_count = int.from_bytes(deposit_count_bytes, byteorder='little') + deposits = self.deposits[:deposit_count] + _, deposit_root = make_deposit_tree_and_root(deposits) + return deposit_root diff --git a/trinity/components/eth2/eth1_monitor/eth1_monitor.py b/trinity/components/eth2/eth1_monitor/eth1_monitor.py index d31e57ecc6..c99e9cb451 100644 --- a/trinity/components/eth2/eth1_monitor/eth1_monitor.py +++ b/trinity/components/eth2/eth1_monitor/eth1_monitor.py @@ -213,14 +213,12 @@ def _get_eth1_data( contract_deposit_root = self._get_deposit_root_from_contract( target_block_number ) - # TODO: Remove this if we no longer need a fake provider - if not self._eth1_data_provider.is_fake_provider: - if contract_deposit_root != deposit_root: - raise DepositDataCorrupted( - "deposit root built locally mismatches the one in the contract on chain: " - f"contract_deposit_root={contract_deposit_root.hex()}, " - f"deposit_root={deposit_root.hex()}" - ) + if contract_deposit_root != deposit_root: + raise DepositDataCorrupted( + "deposit root built locally mismatches the one in the contract on chain: " + f"contract_deposit_root={contract_deposit_root.hex()}, " + f"deposit_root={deposit_root.hex()}" + ) return Eth1Data( deposit_root=deposit_root, deposit_count=accumulated_deposit_count, From 04dc677dc2514d19376d372bdef43576614862e6 Mon Sep 17 00:00:00 2001 From: NIC619 Date: Tue, 17 Dec 2019 16:10:19 +0800 Subject: [PATCH 17/18] Add hardcoded `ETH1_FOLLOW_DISTANCE` to validator.py and fix lint --- .../quickstart_state/generate_beacon_genesis.py | 8 ++------ eth2/beacon/tools/builder/initializer.py | 16 ++++++++-------- trinity/components/eth2/beacon/validator.py | 4 ++++ .../eth2/eth1_monitor/eth1_data_provider.py | 17 ++++++++--------- .../eth2/eth1_monitor/eth1_monitor.py | 2 +- 5 files changed, 23 insertions(+), 24 deletions(-) diff --git a/eth2/beacon/scripts/quickstart_state/generate_beacon_genesis.py b/eth2/beacon/scripts/quickstart_state/generate_beacon_genesis.py index 385d3cff83..a679234557 100644 --- a/eth2/beacon/scripts/quickstart_state/generate_beacon_genesis.py +++ b/eth2/beacon/scripts/quickstart_state/generate_beacon_genesis.py @@ -32,13 +32,9 @@ def _main(): key_set = load_yaml_at(KEY_DIR / KEY_SET_FILE) pubkeys, privkeys, withdrawal_credentials = create_keypair_and_mock_withdraw_credentials( - config, - key_set, + config, key_set ) - keymap = { - pubkey: privkey - for pubkey, privkey in zip(pubkeys, privkeys) - } + keymap = {pubkey: privkey for pubkey, privkey in zip(pubkeys, privkeys)} deposits, _ = create_mock_deposits_and_root( pubkeys, keymap, config, withdrawal_credentials diff --git a/eth2/beacon/tools/builder/initializer.py b/eth2/beacon/tools/builder/initializer.py index 7ce0726c5c..c0c08c9156 100644 --- a/eth2/beacon/tools/builder/initializer.py +++ b/eth2/beacon/tools/builder/initializer.py @@ -34,16 +34,15 @@ def generate_privkey_from_index(index: int) -> int: def create_keypair_and_mock_withdraw_credentials( - config: Eth2Config, - key_set: Dict[str, Any], -) -> Tuple[Sequence[BLSPubkey], Sequence[int], Sequence[Hash32]]: - pubkeys = () - privkeys = () - withdrawal_credentials = () + config: Eth2Config, key_set: Sequence[Dict[str, Any]] +) -> Tuple[Tuple[BLSPubkey, ...], Tuple[int, ...], Tuple[Hash32, ...]]: + pubkeys: Tuple[BLSPubkey, ...] = () + privkeys: Tuple[int, ...] = () + withdrawal_credentials: Tuple[Hash32, ...] = () for key_pair in key_set: - pubkey = decode_hex(key_pair["pubkey"]) + pubkey = BLSPubkey(decode_hex(key_pair["pubkey"])) privkey = int.from_bytes(decode_hex(key_pair["privkey"]), "big") - withdrawal_credential = ( + withdrawal_credential = Hash32( config.BLS_WITHDRAWAL_PREFIX.to_bytes(1, byteorder="big") + hash_eth2(pubkey)[1:] ) @@ -51,6 +50,7 @@ def create_keypair_and_mock_withdraw_credentials( pubkeys += (pubkey,) privkeys += (privkey,) withdrawal_credentials += (withdrawal_credential,) + return (pubkeys, privkeys, withdrawal_credentials) diff --git a/trinity/components/eth2/beacon/validator.py b/trinity/components/eth2/beacon/validator.py index 8b5292cf2b..5b0f125189 100644 --- a/trinity/components/eth2/beacon/validator.py +++ b/trinity/components/eth2/beacon/validator.py @@ -80,6 +80,10 @@ GetReadyAttestationsFn = Callable[[Slot], Sequence[Attestation]] +# FIXME: Read this from validator config +ETH1_FOLLOW_DISTANCE = 16 + + class Validator(BaseService): chain: BaseBeaconChain p2p_node: Node diff --git a/trinity/components/eth2/eth1_monitor/eth1_data_provider.py b/trinity/components/eth2/eth1_monitor/eth1_data_provider.py index 1aac85051f..dbf7ffbd1e 100644 --- a/trinity/components/eth2/eth1_monitor/eth1_data_provider.py +++ b/trinity/components/eth2/eth1_monitor/eth1_data_provider.py @@ -1,5 +1,4 @@ from abc import ABC, abstractmethod -import logging import time from typing import Any, Dict, NamedTuple, Optional, Tuple, Union @@ -181,8 +180,8 @@ def get_block(self, arg: Union[Hash32, int, str]) -> Optional[Eth1Block]: if isinstance(arg, int): block_time = self._get_block_time(BlockNumber(arg)) return Eth1Block( - block_hash=int(arg).to_bytes(32, byteorder='big'), - number=arg, + block_hash=Hash32(int(arg).to_bytes(32, byteorder='big')), + number=BlockNumber(arg), timestamp=Timestamp(block_time), ) # If `arg` is block hash @@ -196,13 +195,13 @@ def get_block(self, arg: Union[Hash32, int, str]) -> Optional[Eth1Block]: # If provided block number does not make sense, # assume it's the block at `earliest_follow_block_number`. return Eth1Block( - block_hash=earliest_follow_block_number.to_bytes(32, byteorder='big'), + block_hash=Hash32(earliest_follow_block_number.to_bytes(32, byteorder='big')), number=BlockNumber(earliest_follow_block_number), timestamp=Timestamp( self.start_block_timestamp - ETH1_FOLLOW_DISTANCE * AVERAGE_BLOCK_TIME, ), ) - block_time = self._get_block_time(block_number) + block_time = self._get_block_time(BlockNumber(block_number)) return Eth1Block( block_hash=arg, number=BlockNumber(block_number), @@ -213,7 +212,7 @@ def get_block(self, arg: Union[Hash32, int, str]) -> Optional[Eth1Block]: latest_block_number = self._get_latest_block_number() block_time = self._get_block_time(latest_block_number) return Eth1Block( - block_hash=latest_block_number.to_bytes(32, byteorder='big'), + block_hash=Hash32(latest_block_number.to_bytes(32, byteorder='big')), number=BlockNumber(latest_block_number), timestamp=block_time, ) @@ -223,7 +222,7 @@ def get_logs(self, block_number: BlockNumber) -> Tuple[DepositLog, ...]: if block_number == self.start_block_number: logs = ( DepositLog( - block_hash=block_hash, + block_hash=Hash32(block_hash), pubkey=deposit.pubkey, withdrawal_credentials=deposit.withdrawal_credentials, signature=deposit.signature, @@ -235,7 +234,7 @@ def get_logs(self, block_number: BlockNumber) -> Tuple[DepositLog, ...]: else: logs = ( DepositLog( - block_hash=block_hash, + block_hash=Hash32(block_hash), pubkey=BLSPubkey(b'\x12' * 48), withdrawal_credentials=Hash32(b'\x23' * 32), signature=BLSSignature(b'\x34' * 96), @@ -258,7 +257,7 @@ def get_deposit_root(self, block_number: BlockNumber) -> Hash32: # Check and update deposit data when deposit root is requested if self.latest_processed_block_number < block_number: for blk_number in range(self.latest_processed_block_number + 1, block_number + 1): - deposit_logs = self.get_logs(blk_number) + deposit_logs = self.get_logs(BlockNumber(blk_number)) self.deposits += tuple( convert_deposit_log_to_deposit_data(deposit_log) for deposit_log in deposit_logs diff --git a/trinity/components/eth2/eth1_monitor/eth1_monitor.py b/trinity/components/eth2/eth1_monitor/eth1_monitor.py index c99e9cb451..418578bfa9 100644 --- a/trinity/components/eth2/eth1_monitor/eth1_monitor.py +++ b/trinity/components/eth2/eth1_monitor/eth1_monitor.py @@ -62,6 +62,7 @@ def _w3_get_block(w3: Web3, *args: Any, **kwargs: Any) -> Eth1Block: class Eth1Monitor(Service): + logger = logging.getLogger('trinity.eth1_monitor') _eth1_data_provider: BaseEth1DataProvider @@ -281,7 +282,6 @@ async def _new_blocks(self) -> AsyncGenerator[Eth1Block, None]: raise Eth1MonitorValidationError(f"Fail to get latest block") target_block_number = BlockNumber(block.number - self._num_blocks_confirmed) from_block_number = self.highest_processed_block_number - self.logger.info("Eth1 Data Provider latest block: %s, %s, %s", block.number, target_block_number, from_block_number) if target_block_number > from_block_number: # From `highest_processed_block_number` to `target_block_number` for block_number in range( From 6ffe1adaf6e8c8038b51bc3c158b5f9b4608da41 Mon Sep 17 00:00:00 2001 From: NIC619 Date: Tue, 17 Dec 2019 18:58:41 +0800 Subject: [PATCH 18/18] Apply PR feedback: raise `BlockNotFound` if web3.eth.getBlock return None --- .../components/eth2/eth1_monitor/component.py | 2 +- .../eth2/eth1_monitor/eth1_data_provider.py | 6 +++-- .../eth2/eth1_monitor/eth1_monitor.py | 23 +++++++++++-------- 3 files changed, 19 insertions(+), 12 deletions(-) diff --git a/trinity/components/eth2/eth1_monitor/component.py b/trinity/components/eth2/eth1_monitor/component.py index 96c4ca3fff..b41ec293a8 100644 --- a/trinity/components/eth2/eth1_monitor/component.py +++ b/trinity/components/eth2/eth1_monitor/component.py @@ -91,7 +91,7 @@ async def do_run(cls, boot_info: BootInfo, event_bus: EndpointAPI) -> None: ) pubkeys, privkeys, withdrawal_credentials = create_keypair_and_mock_withdraw_credentials( config, - key_set, + key_set, # type: ignore ) initial_deposits = ( create_mock_deposit_data( diff --git a/trinity/components/eth2/eth1_monitor/eth1_data_provider.py b/trinity/components/eth2/eth1_monitor/eth1_data_provider.py index dbf7ffbd1e..8b88ab0246 100644 --- a/trinity/components/eth2/eth1_monitor/eth1_data_provider.py +++ b/trinity/components/eth2/eth1_monitor/eth1_data_provider.py @@ -2,9 +2,11 @@ import time from typing import Any, Dict, NamedTuple, Optional, Tuple, Union -from eth_typing import Address, BLSPubkey, BLSSignature, BlockNumber, Hash32 +from eth.exceptions import BlockNotFound +from eth_typing import Address, BLSPubkey, BLSSignature, BlockNumber, Hash32 from eth_utils import encode_hex, event_abi_to_log_topic + from web3 import Web3 from web3.utils.events import get_event_data @@ -98,7 +100,7 @@ def __init__( def get_block(self, arg: Union[Hash32, int, str]) -> Optional[Eth1Block]: block_dict = self.w3.eth.getBlock(arg) if block_dict is None: - return None + raise BlockNotFound return Eth1Block( block_hash=Hash32(block_dict["hash"]), number=BlockNumber(block_dict["number"]), diff --git a/trinity/components/eth2/eth1_monitor/eth1_monitor.py b/trinity/components/eth2/eth1_monitor/eth1_monitor.py index 418578bfa9..f2b19ad964 100644 --- a/trinity/components/eth2/eth1_monitor/eth1_monitor.py +++ b/trinity/components/eth2/eth1_monitor/eth1_monitor.py @@ -20,6 +20,7 @@ from web3 import Web3 from eth.abc import AtomicDatabaseAPI +from eth.exceptions import BlockNotFound from eth_utils import humanize_hash @@ -127,8 +128,9 @@ def _handle_get_distance(self, req: GetDistanceRequest) -> GetDistanceResponse: """ Handle requests for `get_distance` from the event bus. """ - block = self._eth1_data_provider.get_block(req.block_hash) - if block is None: + try: + block = self._eth1_data_provider.get_block(req.block_hash) + except BlockNotFound: raise Eth1MonitorValidationError( f"Block does not exist for block_hash={humanize_hash(req.block_hash)}" ) @@ -192,8 +194,9 @@ def _get_eth1_data( f"`distance`={distance}, ", f"eth1_voting_period_start_block_number={eth1_voting_period_start_block_number}", ) - block = self._eth1_data_provider.get_block(target_block_number) - if block is None: + try: + block = self._eth1_data_provider.get_block(target_block_number) + except BlockNotFound: raise Eth1MonitorValidationError( f"Block does not exist for block number={target_block_number}" ) @@ -277,9 +280,10 @@ async def _new_blocks(self) -> AsyncGenerator[Eth1Block, None]: `latest_block.number - self._num_blocks_confirmed`. """ while True: - block = self._eth1_data_provider.get_block("latest") - if block is None: - raise Eth1MonitorValidationError(f"Fail to get latest block") + try: + block = self._eth1_data_provider.get_block("latest") + except BlockNotFound: + raise Eth1MonitorValidationError("Fail to get latest block") target_block_number = BlockNumber(block.number - self._num_blocks_confirmed) from_block_number = self.highest_processed_block_number if target_block_number > from_block_number: @@ -287,8 +291,9 @@ async def _new_blocks(self) -> AsyncGenerator[Eth1Block, None]: for block_number in range( from_block_number + 1, target_block_number + 1 ): - block = self._eth1_data_provider.get_block(BlockNumber(block_number)) - if block is None: + try: + block = self._eth1_data_provider.get_block(BlockNumber(block_number)) + except BlockNotFound: raise Eth1MonitorValidationError( f"Block does not exist for block number={block_number}" )