From 9ba2c1fae5de83ae3fe55688197630e65262f1b9 Mon Sep 17 00:00:00 2001 From: Piper Merriam Date: Fri, 31 Aug 2018 12:17:17 -0600 Subject: [PATCH] Expand peer stats tracking (#1225) * extract tracker concept from normalizer * expand stats gathering on peer responses * pr feedback --- tests/trinity/core/p2p-proto/test_stats.py | 14 +- trinity/protocol/common/constants.py | 2 + trinity/protocol/common/exchanges.py | 15 +- trinity/protocol/common/handlers.py | 20 +-- trinity/protocol/common/managers.py | 74 +++----- trinity/protocol/common/normalizers.py | 12 -- trinity/protocol/common/trackers.py | 194 +++++++++++++++++++++ trinity/protocol/common/types.py | 4 + trinity/protocol/eth/exchanges.py | 48 ++++- trinity/protocol/eth/handlers.py | 2 +- trinity/protocol/eth/normalizers.py | 12 -- trinity/protocol/eth/requests.py | 5 + trinity/protocol/eth/trackers.py | 88 ++++++++++ trinity/protocol/les/exchanges.py | 6 +- trinity/protocol/les/handlers.py | 2 +- trinity/protocol/les/normalizers.py | 4 - trinity/protocol/les/trackers.py | 39 +++++ trinity/utils/logging.py | 13 ++ 18 files changed, 449 insertions(+), 105 deletions(-) create mode 100644 trinity/protocol/common/constants.py create mode 100644 trinity/protocol/common/trackers.py create mode 100644 trinity/protocol/eth/trackers.py create mode 100644 trinity/protocol/les/trackers.py diff --git a/tests/trinity/core/p2p-proto/test_stats.py b/tests/trinity/core/p2p-proto/test_stats.py index bf9ee37709..e5eda234fa 100644 --- a/tests/trinity/core/p2p-proto/test_stats.py +++ b/tests/trinity/core/p2p-proto/test_stats.py @@ -71,7 +71,7 @@ async def les_peer_and_remote(request, event_loop): async def test_eth_get_headers_empty_stats(eth_peer_and_remote): peer, remote = eth_peer_and_remote stats = peer.requests.get_stats() - assert all(status == 'Uninitialized' for status in stats.values()) + assert all(status == 'None' for status in stats.values()) assert 'BlockHeaders' in stats.keys() @@ -90,8 +90,10 @@ async def send_headers(): stats = peer.requests.get_stats() - assert stats['BlockHeaders'].startswith('count={0}, items={0}, avg_rtt='.format(idx)) - assert stats['BlockHeaders'].endswith(', timeouts=0') + assert stats['BlockHeaders'].startswith('msgs={0} items={0} rtt='.format(idx)) + assert 'timeouts=0' in stats['BlockHeaders'] + assert 'quality=' in stats['BlockHeaders'] + assert 'ips=' in stats['BlockHeaders'] @pytest.mark.asyncio @@ -113,5 +115,7 @@ async def test_les_get_headers_stats(les_peer_and_remote): stats = peer.requests.get_stats() - assert stats['BlockHeaders'].startswith('count={0}, items={0}, avg_rtt='.format(idx)) - assert stats['BlockHeaders'].endswith(', timeouts=0') + assert stats['BlockHeaders'].startswith('msgs={0} items={0} rtt='.format(idx)) + assert 'timeouts=0' in stats['BlockHeaders'] + assert 'quality=' in stats['BlockHeaders'] + assert 'ips=' in stats['BlockHeaders'] diff --git a/trinity/protocol/common/constants.py b/trinity/protocol/common/constants.py new file mode 100644 index 0000000000..06596aadda --- /dev/null +++ b/trinity/protocol/common/constants.py @@ -0,0 +1,2 @@ +# The default timeout for a round trip API request and response from a peer. +ROUND_TRIP_TIMEOUT = 20.0 diff --git a/trinity/protocol/common/exchanges.py b/trinity/protocol/common/exchanges.py index 4999edee6b..8db3464979 100644 --- a/trinity/protocol/common/exchanges.py +++ b/trinity/protocol/common/exchanges.py @@ -14,7 +14,12 @@ ) from trinity.utils.decorators import classproperty -from .managers import ExchangeManager +from .trackers import ( + BasePerformanceTracker, +) +from .managers import ( + ExchangeManager, +) from .normalizers import BaseNormalizer from .types import ( TResponsePayload, @@ -42,9 +47,12 @@ class BaseExchange(ABC, Generic[TRequestPayload, TResponsePayload, TResult]): """ request_class: Type[BaseRequest[TRequestPayload]] + tracker_class: Type[BasePerformanceTracker[Any, TResult]] + tracker: BasePerformanceTracker[BaseRequest[TRequestPayload], TResult] def __init__(self, mgr: ExchangeManager[TRequestPayload, TResponsePayload, TResult]) -> None: self._manager = mgr + self.tracker = self.tracker_class() async def get_result( self, @@ -52,7 +60,7 @@ async def get_result( normalizer: BaseNormalizer[TResponsePayload, TResult], result_validator: BaseValidator[TResult], payload_validator: Callable[[TRequestPayload, TResponsePayload], None], - timeout: int = None) -> TResult: + timeout: float = None) -> TResult: """ This is a light convenience wrapper around the ExchangeManager's get_result() method. @@ -71,7 +79,8 @@ async def get_result( normalizer, result_validator.validate_result, message_validator, - timeout=timeout + self.tracker, + timeout, ) @classproperty diff --git a/trinity/protocol/common/handlers.py b/trinity/protocol/common/handlers.py index 8d78ff4069..a0283afcec 100644 --- a/trinity/protocol/common/handlers.py +++ b/trinity/protocol/common/handlers.py @@ -1,8 +1,7 @@ -from abc import abstractmethod +from abc import ABC, abstractmethod from typing import ( Any, Dict, - Set, Type, ) @@ -16,19 +15,16 @@ ) -class BaseExchangeHandler: - _exchange_managers: Set[ExchangeManager[Any, Any, Any]] - +class BaseExchangeHandler(ABC): @property @abstractmethod - def _exchanges(self) -> Dict[str, Type[BaseExchange[Any, Any, Any]]]: + def _exchange_config(self) -> Dict[str, Type[BaseExchange[Any, Any, Any]]]: pass def __init__(self, peer: BasePeer) -> None: self._peer = peer - self._exchange_managers = set() - for attr, exchange_cls in self._exchanges.items(): + for attr, exchange_cls in self._exchange_config.items(): if hasattr(self, attr): raise AttributeError( "Unable to set manager on attribute `{0}` which is already " @@ -36,9 +32,13 @@ def __init__(self, peer: BasePeer) -> None: ) manager: ExchangeManager[Any, Any, Any] manager = ExchangeManager(self._peer, exchange_cls.response_cmd_type, peer.cancel_token) - self._exchange_managers.add(manager) exchange = exchange_cls(manager) setattr(self, attr, exchange) def get_stats(self) -> Dict[str, str]: - return dict(exchange_manager.get_stats() for exchange_manager in self._exchange_managers) + exchanges = tuple(getattr(self, key) for key in self._exchange_config.keys()) + return { + exchange.response_cmd_type.__name__: exchange.tracker.get_stats() + for exchange + in exchanges + } diff --git a/trinity/protocol/common/managers.py b/trinity/protocol/common/managers.py index 447891560d..678d83309a 100644 --- a/trinity/protocol/common/managers.py +++ b/trinity/protocol/common/managers.py @@ -1,6 +1,6 @@ import asyncio import time -from typing import ( # noqa: F401 -- AsyncGenerator needed by mypy +from typing import ( Any, AsyncGenerator, Callable, @@ -28,38 +28,15 @@ from trinity.exceptions import AlreadyWaiting +from .constants import ROUND_TRIP_TIMEOUT from .normalizers import BaseNormalizer +from .trackers import BasePerformanceTracker from .types import ( TResponsePayload, TResult, ) -class ResponseTimeTracker: - - def __init__(self) -> None: - self.total_msgs = 0 - self.total_items = 0 - self.total_timeouts = 0 - self.total_response_time = 0.0 - - def get_stats(self) -> str: - if not self.total_msgs: - return 'None' - avg_rtt = self.total_response_time / self.total_msgs - if not self.total_items: - per_item_rtt = 0.0 - else: - per_item_rtt = self.total_response_time / self.total_items - return 'count=%d, items=%d, avg_rtt=%.2f, avg_time_per_item=%.5f, timeouts=%d' % ( - self.total_msgs, self.total_items, avg_rtt, per_item_rtt, self.total_timeouts) - - def add(self, time: float, size: int) -> None: - self.total_msgs += 1 - self.total_items += size - self.total_response_time += time - - class ResponseCandidateStream( PeerSubscriber, BaseService, @@ -74,7 +51,7 @@ def subscription_msg_types(self) -> Set[Type[Command]]: msg_queue_maxsize = 100 - response_timout: int = 20 + response_timout: float = ROUND_TRIP_TIMEOUT pending_request: Tuple[float, 'asyncio.Future[TResponsePayload]'] = None @@ -87,13 +64,14 @@ def __init__( token: CancelToken) -> None: super().__init__(token) self._peer = peer - self.response_times = ResponseTimeTracker() self.response_msg_type = response_msg_type async def payload_candidates( self, request: BaseRequest[TRequestPayload], - timeout: int = None) -> 'AsyncGenerator[TResponsePayload, None]': + tracker: BasePerformanceTracker[BaseRequest[TRequestPayload], Any], + *, + timeout: float = None) -> AsyncGenerator[TResponsePayload, None]: """ Make a request and iterate through candidates for a valid response. @@ -105,15 +83,20 @@ async def payload_candidates( self._request(request) while self._is_pending(): - yield await self._get_payload(timeout) + try: + yield await self._get_payload(timeout) + except TimeoutError: + tracker.record_timeout(timeout) + raise @property def response_msg_name(self) -> str: return self.response_msg_type.__name__ - def complete_request(self, item_count: int) -> None: + def complete_request(self) -> None: + if self.pending_request is None: + self.logger.warning("`complete_request` was called when there was no pending request") self.pending_request = None - self.response_times.add(self.last_response_time, item_count) # # Service API @@ -143,13 +126,10 @@ async def _handle_msg(self, msg: TResponsePayload) -> None: self.last_response_time = time.perf_counter() - send_time future.set_result(msg) - async def _get_payload(self, timeout: int) -> TResponsePayload: + async def _get_payload(self, timeout: float) -> TResponsePayload: send_time, future = self.pending_request try: payload = await self.wait(future, timeout=timeout) - except TimeoutError: - self.response_times.total_timeouts += 1 - raise finally: self.pending_request = None @@ -192,9 +172,6 @@ def deregister_peer(self, peer: BasePeer) -> None: _, future = self.pending_request future.set_exception(PeerConnectionLost("Pending request can't complete: peer is gone")) - def get_stats(self) -> Tuple[str, str]: - return (self.response_msg_name, self.response_times.get_stats()) - def __repr__(self) -> str: return f'' @@ -230,14 +207,15 @@ async def get_result( normalizer: BaseNormalizer[TResponsePayload, TResult], validate_result: Callable[[TResult], None], payload_validator: Callable[[TResponsePayload], None], - timeout: int = None) -> TResult: + tracker: BasePerformanceTracker[BaseRequest[TRequestPayload], TResult], + timeout: float = None) -> TResult: if not self.is_operational: raise ValidationError("You must call `launch_service` before initiating a peer request") stream = self._response_stream - async for payload in stream.payload_candidates(request, timeout): + async for payload in stream.payload_candidates(request, tracker, timeout=timeout): try: payload_validator(payload) @@ -256,8 +234,12 @@ async def get_result( ) continue else: - num_items = normalizer.get_num_results(result) - stream.complete_request(num_items) + tracker.record_response( + stream.last_response_time, + request, + result, + ) + stream.complete_request() return result raise ValidationError("Manager is not pending a response, but no valid response received") @@ -268,9 +250,3 @@ def service(self) -> BaseService: This service that needs to be running for calls to execute properly """ return self._response_stream - - def get_stats(self) -> Tuple[str, str]: - if self._response_stream is None: - return (self._response_command_type.__name__, 'Uninitialized') - else: - return self._response_stream.get_stats() diff --git a/trinity/protocol/common/normalizers.py b/trinity/protocol/common/normalizers.py index b0eb8b773b..2849b64663 100644 --- a/trinity/protocol/common/normalizers.py +++ b/trinity/protocol/common/normalizers.py @@ -28,14 +28,6 @@ def normalize_result(message: TResponsePayload) -> TResult: """ raise NotImplementedError() - @staticmethod - @abstractmethod - def get_num_results(result: TResult) -> int: - """ - Count the number of items returned in the result. - """ - raise NotImplementedError() - TPassthrough = TypeVar('TPassthrough', bound=PayloadType) @@ -44,7 +36,3 @@ class NoopNormalizer(BaseNormalizer[TPassthrough, TPassthrough]): @staticmethod def normalize_result(message: TPassthrough) -> TPassthrough: return message - - @staticmethod - def get_num_results(result: TPassthrough) -> int: - return len(result) diff --git a/trinity/protocol/common/trackers.py b/trinity/protocol/common/trackers.py new file mode 100644 index 0000000000..c8d450f1f5 --- /dev/null +++ b/trinity/protocol/common/trackers.py @@ -0,0 +1,194 @@ +from abc import ABC, abstractmethod +from typing import ( + Any, + Generic, + Optional, + TypeVar, + Union, +) + +from eth_utils import ValidationError + +from p2p.protocol import ( + BaseRequest, +) + +from trinity.utils.logging import HasTraceLogger +from .constants import ROUND_TRIP_TIMEOUT +from .types import ( + TResult, +) + + +TRequest = TypeVar('TRequest', bound=BaseRequest[Any]) + + +class EMA: + """ + Represents an exponential moving average. + https://en.wikipedia.org/wiki/Moving_average#Exponential_moving_average + + Smoothing factor, or "alpha" of the exponential moving average. + + - Closer to 0 gives you smoother, slower-to-update, data + - Closer to 1 gives you choppier, quicker-to-update, data + + .. note:: + + A smoothing factor of 1 would completely ignore history whereas 0 would + completely ignore new data + + + The initial value is the starting value for the EMA + """ + def __init__(self, initial_value: float, smoothing_factor: float) -> None: + self._value = initial_value + if 0 < smoothing_factor < 1: + self._alpha = smoothing_factor + else: + raise ValidationError("Smoothing factor of EMA must be between 0 and 1") + + def update(self, scalar: Union[int, float]) -> None: + self._value = (self._value * (1 - self._alpha)) + (scalar * self._alpha) + + @property + def value(self) -> float: + return self._value + + +class BasePerformanceTracker(ABC, HasTraceLogger, Generic[TRequest, TResult]): + def __init__(self) -> None: + self.total_msgs = 0 + self.total_items = 0 + self.total_timeouts = 0 + self.total_response_time = 0.0 + + # a percentage between 0-100 for how much of the requested + # data the peer typically returns with 100 meaning they consistently + # return all of the data we request and 0 meaning they only return + # empty responses. + self.response_quality_ema = EMA(initial_value=0, smoothing_factor=0.05) + + # an EMA of the round trip request/response time + self.round_trip_ema = EMA(initial_value=ROUND_TRIP_TIMEOUT, smoothing_factor=0.05) + + # an EMA of the items per second + self.items_per_second_ema = EMA(initial_value=0, smoothing_factor=0.05) + + @abstractmethod + def _get_request_size(self, request: TRequest) -> Optional[int]: + """ + The request size represents the number of *things* that were requested, + not taking into account the sizes of individual items. + + Some requests cannot be used to determine the expected size. In this + case `None` should be returned. (Specifically the `GetBlockHeaders` + anchored to a block hash. + """ + pass + + @abstractmethod + def _get_result_size(self, result: TResult) -> int: + """ + The result size represents the number of *things* that were returned, + not taking into account the sizes of individual items. + """ + pass + + @abstractmethod + def _get_result_item_count(self, result: TResult) -> int: + """ + The item count is intended to more accurately represent the size of the + response, taking into account things like the size of individual + response items such as the number of transactions in a block. + """ + pass + + def get_stats(self) -> str: + """ + Return a human readable string representing the stats for this tracker. + """ + if not self.total_msgs: + return 'None' + avg_rtt = self.total_response_time / self.total_msgs + if not self.total_items: + items_per_second = 0.0 + else: + items_per_second = self.total_response_time / self.total_items + + # msgs: total number of messages + # items: total number of items + # rtt: round-trip-time (avg/ema) + # ips: items-per-second (avg/ema) + # timeouts: total number of timeouts + # missing: total number of missing response items + # quality: 0-100 for how complete responses are + return ( + 'msgs=%d items=%d rtt=%.2f/%.2f ips=%.5f/%.5f ' + 'timeouts=%d quality=%d' + ) % ( + self.total_msgs, + self.total_items, + avg_rtt, + self.round_trip_ema.value, + items_per_second, + self.items_per_second_ema.value, + self.total_timeouts, + int(self.response_quality_ema.value), + ) + + def record_timeout(self, timeout: float) -> None: + self.total_msgs += 1 + self.total_timeouts += 1 + self.response_quality_ema.update(0) + self.items_per_second_ema.update(0) + self.round_trip_ema.update(timeout) + + def record_response(self, + elapsed: float, + request: TRequest, + result: TResult) -> None: + self.total_msgs += 1 + + request_size = self._get_request_size(request) + response_size = self._get_result_size(result) + num_items = self._get_result_item_count(result) + + if request_size is None: + # In the event that request size cannot be determined we skip stats + # tracking based on request size. + pass + + elif response_size > request_size: + self.logger.warning( + "%s got oversized response. requested: %d received: %d", + type(self).__name__, + request_size, + response_size, + ) + else: + if request_size == 0: + self.logger.warning( + "%s encountered request for zero items. This should never happen", + type(self).__name__, + ) + # we intentionally don't update the ema here since this is an + # odd and unexpected case. + elif response_size == 0: + self.response_quality_ema.update(0) + else: + percent_returned = 100 * response_size / request_size + self.response_quality_ema.update(percent_returned) + + self.total_items += num_items + self.total_response_time += elapsed + self.round_trip_ema.update(elapsed) + + if elapsed > 0: + throughput = num_items / elapsed + self.items_per_second_ema.update(throughput) + else: + self.logger.warning( + "%s encountered response time of zero. This should never happen", + type(self).__name__, + ) diff --git a/trinity/protocol/common/types.py b/trinity/protocol/common/types.py index 3aa9c86e26..75c2d486f7 100644 --- a/trinity/protocol/common/types.py +++ b/trinity/protocol/common/types.py @@ -21,6 +21,10 @@ # The returned value at the end of an exchange TResult = TypeVar('TResult') +# ( +# (node_hash, node), +# ... +# ) NodeDataBundles = Tuple[Tuple[Hash32, bytes], ...] # (receipts_in_block_a, receipts_in_block_b, ...) diff --git a/trinity/protocol/eth/exchanges.py b/trinity/protocol/eth/exchanges.py index b6ee87f2cd..6a1b0148a1 100644 --- a/trinity/protocol/eth/exchanges.py +++ b/trinity/protocol/eth/exchanges.py @@ -38,6 +38,12 @@ GetNodeDataRequest, GetReceiptsRequest, ) +from .trackers import ( + GetBlockHeadersTracker, + GetBlockBodiesTracker, + GetNodeDataTracker, + GetReceiptsTracker +) from .validators import ( GetBlockBodiesValidator, GetBlockHeadersValidator, @@ -55,6 +61,7 @@ class GetBlockHeadersExchange(BaseGetBlockHeadersExchange): _normalizer = NoopNormalizer[Tuple[BlockHeader, ...]]() request_class = GetBlockHeadersRequest + tracker_class = GetBlockHeadersTracker async def __call__( # type: ignore self, @@ -62,7 +69,7 @@ async def __call__( # type: ignore max_headers: int = None, skip: int = 0, reverse: bool = True, - timeout: int = None) -> Tuple[BlockHeader, ...]: + timeout: float = None) -> Tuple[BlockHeader, ...]: original_request_args = (block_number_or_hash, max_headers, skip, reverse) validator = GetBlockHeadersValidator(*original_request_args) @@ -83,24 +90,42 @@ async def __call__( # type: ignore class GetNodeDataExchange(BaseNodeDataExchange): _normalizer = GetNodeDataNormalizer() request_class = GetNodeDataRequest + tracker_class = GetNodeDataTracker - async def __call__(self, node_hashes: Tuple[Hash32, ...]) -> NodeDataBundles: # type: ignore + async def __call__(self, # type: ignore + node_hashes: Tuple[Hash32, ...], + timeout: float = None) -> NodeDataBundles: validator = GetNodeDataValidator(node_hashes) request = self.request_class(node_hashes) - return await self.get_result(request, self._normalizer, validator, noop_payload_validator) + return await self.get_result( + request, + self._normalizer, + validator, + noop_payload_validator, + timeout, + ) class GetReceiptsExchange(BaseExchange[Tuple[Hash32, ...], ReceiptsByBlock, ReceiptsBundles]): _normalizer = ReceiptsNormalizer() request_class = GetReceiptsRequest + tracker_class = GetReceiptsTracker - async def __call__(self, headers: Tuple[BlockHeader, ...]) -> ReceiptsBundles: # type: ignore + async def __call__(self, # type: ignore + headers: Tuple[BlockHeader, ...], + timeout: float = None) -> ReceiptsBundles: # type: ignore validator = ReceiptsValidator(headers) block_hashes = tuple(header.hash for header in headers) request = self.request_class(block_hashes) - return await self.get_result(request, self._normalizer, validator, noop_payload_validator) + return await self.get_result( + request, + self._normalizer, + validator, + noop_payload_validator, + timeout, + ) BaseGetBlockBodiesExchange = BaseExchange[ @@ -113,11 +138,20 @@ async def __call__(self, headers: Tuple[BlockHeader, ...]) -> ReceiptsBundles: class GetBlockBodiesExchange(BaseGetBlockBodiesExchange): _normalizer = GetBlockBodiesNormalizer() request_class = GetBlockBodiesRequest + tracker_class = GetBlockBodiesTracker - async def __call__(self, headers: Tuple[BlockHeader, ...]) -> BlockBodyBundles: # type: ignore + async def __call__(self, # type: ignore + headers: Tuple[BlockHeader, ...], + timeout: float = None) -> BlockBodyBundles: validator = GetBlockBodiesValidator(headers) block_hashes = tuple(header.hash for header in headers) request = self.request_class(block_hashes) - return await self.get_result(request, self._normalizer, validator, noop_payload_validator) + return await self.get_result( + request, + self._normalizer, + validator, + noop_payload_validator, + timeout, + ) diff --git a/trinity/protocol/eth/handlers.py b/trinity/protocol/eth/handlers.py index 3d3d58af10..d5c45b200c 100644 --- a/trinity/protocol/eth/handlers.py +++ b/trinity/protocol/eth/handlers.py @@ -11,7 +11,7 @@ class ETHExchangeHandler(BaseExchangeHandler): - _exchanges = { + _exchange_config = { 'get_block_bodies': GetBlockBodiesExchange, 'get_block_headers': GetBlockHeadersExchange, 'get_node_data': GetNodeDataExchange, diff --git a/trinity/protocol/eth/normalizers.py b/trinity/protocol/eth/normalizers.py index 5286f36e57..e9c3a05766 100644 --- a/trinity/protocol/eth/normalizers.py +++ b/trinity/protocol/eth/normalizers.py @@ -30,10 +30,6 @@ def normalize_result(msg: Tuple[bytes, ...]) -> NodeDataBundles: result = tuple(zip(node_keys, msg)) return result - @staticmethod - def get_num_results(result: NodeDataBundles) -> int: - return len(result) - class ReceiptsNormalizer(BaseNormalizer[ReceiptsByBlock, ReceiptsBundles]): is_normalization_slow = True @@ -43,10 +39,6 @@ def normalize_result(message: ReceiptsByBlock) -> ReceiptsBundles: trie_roots_and_data = tuple(map(make_trie_root_and_nodes, message)) return tuple(zip(message, trie_roots_and_data)) - @staticmethod - def get_num_results(result: ReceiptsBundles) -> int: - return sum(len(item) for item in result) - class GetBlockBodiesNormalizer(BaseNormalizer[Tuple[BlockBody, ...], BlockBodyBundles]): is_normalization_slow = True @@ -64,7 +56,3 @@ def normalize_result(msg: Tuple[BlockBody, ...]) -> BlockBodyBundles: body_bundles = tuple(zip(msg, transaction_roots_and_trie_data, uncles_hashes)) return body_bundles - - @staticmethod - def get_num_results(result: BlockBodyBundles) -> int: - return len(result) diff --git a/trinity/protocol/eth/requests.py b/trinity/protocol/eth/requests.py index 46d302cacf..ccb485b765 100644 --- a/trinity/protocol/eth/requests.py +++ b/trinity/protocol/eth/requests.py @@ -28,6 +28,11 @@ class HeaderRequest(BaseHeaderRequest): + """ + TODO: this should be removed from this module. It exists to allow + `p2p.handlers.PeerRequestHandler` to have a common API between light and + full chains so maybe it should go there + """ max_size = MAX_HEADERS_FETCH def __init__(self, diff --git a/trinity/protocol/eth/trackers.py b/trinity/protocol/eth/trackers.py new file mode 100644 index 0000000000..52f1c48973 --- /dev/null +++ b/trinity/protocol/eth/trackers.py @@ -0,0 +1,88 @@ +from typing import ( + Optional, + Tuple, +) + +from eth.rlp.headers import BlockHeader + +from trinity.protocol.common.trackers import BasePerformanceTracker +from trinity.protocol.common.types import ( + BlockBodyBundles, + NodeDataBundles, + ReceiptsBundles, +) +from trinity.utils.headers import sequence_builder + +from .requests import ( + GetBlockBodiesRequest, + GetBlockHeadersRequest, + GetNodeDataRequest, + GetReceiptsRequest, +) + + +BaseGetBlockHeadersTracker = BasePerformanceTracker[ + GetBlockHeadersRequest, + Tuple[BlockHeader, ...], +] + + +class GetBlockHeadersTracker(BaseGetBlockHeadersTracker): + def _get_request_size(self, request: GetBlockHeadersRequest) -> int: + payload = request.command_payload + if isinstance(payload['block_number_or_hash'], int): + return len(sequence_builder( + start_number=payload['block_number_or_hash'], + max_length=payload['max_headers'], + skip=payload['skip'], + reverse=payload['reverse'], + )) + else: + return None + + def _get_result_size(self, result: Tuple[BlockHeader, ...]) -> Optional[int]: + return len(result) + + def _get_result_item_count(self, result: Tuple[BlockHeader, ...]) -> int: + return len(result) + + +class GetBlockBodiesTracker(BasePerformanceTracker[GetBlockBodiesRequest, BlockBodyBundles]): + def _get_request_size(self, request: GetBlockBodiesRequest) -> Optional[int]: + return len(request.command_payload) + + def _get_result_size(self, result: BlockBodyBundles) -> int: + return len(result) + + def _get_result_item_count(self, result: BlockBodyBundles) -> int: + return sum( + len(body.uncles) + len(body.transactions) + for body, trie_data, uncles_hash + in result + ) + + +class GetReceiptsTracker(BasePerformanceTracker[GetReceiptsRequest, ReceiptsBundles]): + def _get_request_size(self, request: GetReceiptsRequest) -> Optional[int]: + return len(request.command_payload) + + def _get_result_size(self, result: ReceiptsBundles) -> int: + return len(result) + + def _get_result_item_count(self, result: ReceiptsBundles) -> int: + return sum( + len(receipts) + for receipts, trie_data + in result + ) + + +class GetNodeDataTracker(BasePerformanceTracker[GetNodeDataRequest, NodeDataBundles]): + def _get_request_size(self, request: GetNodeDataRequest) -> Optional[int]: + return len(request.command_payload) + + def _get_result_size(self, result: NodeDataBundles) -> int: + return len(result) + + def _get_result_item_count(self, result: NodeDataBundles) -> int: + return len(result) diff --git a/trinity/protocol/les/exchanges.py b/trinity/protocol/les/exchanges.py index b2b3d6e4c4..ba0b249c2e 100644 --- a/trinity/protocol/les/exchanges.py +++ b/trinity/protocol/les/exchanges.py @@ -21,6 +21,9 @@ from .requests import ( GetBlockHeadersRequest, ) +from .trackers import ( + GetBlockHeadersTracker, +) from .validators import ( GetBlockHeadersValidator, match_payload_request_id, @@ -35,6 +38,7 @@ class GetBlockHeadersExchange(LESExchange[Tuple[BlockHeader, ...]]): _normalizer = BlockHeadersNormalizer() request_class = GetBlockHeadersRequest + tracker_class = GetBlockHeadersTracker async def __call__( # type: ignore self, @@ -42,7 +46,7 @@ async def __call__( # type: ignore max_headers: int = None, skip: int = 0, reverse: bool = True, - timeout: int = None) -> Tuple[BlockHeader, ...]: + timeout: float = None) -> Tuple[BlockHeader, ...]: original_request_args = (block_number_or_hash, max_headers, skip, reverse) validator = GetBlockHeadersValidator(*original_request_args) diff --git a/trinity/protocol/les/handlers.py b/trinity/protocol/les/handlers.py index e2b736f0ae..b1555c5159 100644 --- a/trinity/protocol/les/handlers.py +++ b/trinity/protocol/les/handlers.py @@ -6,7 +6,7 @@ class LESExchangeHandler(BaseExchangeHandler): - _exchanges = { + _exchange_config = { 'get_block_headers': GetBlockHeadersExchange, } diff --git a/trinity/protocol/les/normalizers.py b/trinity/protocol/les/normalizers.py index 7dd772c242..6618b059aa 100644 --- a/trinity/protocol/les/normalizers.py +++ b/trinity/protocol/les/normalizers.py @@ -18,7 +18,3 @@ class BlockHeadersNormalizer(LESNormalizer[Tuple[BlockHeader, ...]]): def normalize_result(message: Dict[str, Any]) -> Tuple[BlockHeader, ...]: result = message['headers'] return result - - @staticmethod - def get_num_results(result: Tuple[BlockHeader, ...]) -> int: - return len(result) diff --git a/trinity/protocol/les/trackers.py b/trinity/protocol/les/trackers.py new file mode 100644 index 0000000000..e054b7d2a3 --- /dev/null +++ b/trinity/protocol/les/trackers.py @@ -0,0 +1,39 @@ +from typing import ( + Optional, + Tuple, +) + +from eth.rlp.headers import BlockHeader + +from trinity.protocol.common.trackers import BasePerformanceTracker +from trinity.utils.headers import sequence_builder + +from .requests import ( + GetBlockHeadersRequest, +) + + +BaseGetBlockHeadersTracker = BasePerformanceTracker[ + GetBlockHeadersRequest, + Tuple[BlockHeader, ...], +] + + +class GetBlockHeadersTracker(BaseGetBlockHeadersTracker): + def _get_request_size(self, request: GetBlockHeadersRequest) -> Optional[int]: + payload = request.command_payload['query'] + if isinstance(payload['block_number_or_hash'], int): + return len(sequence_builder( + start_number=payload['block_number_or_hash'], + max_length=payload['max_headers'], + skip=payload['skip'], + reverse=payload['reverse'], + )) + else: + return None + + def _get_result_size(self, result: Tuple[BlockHeader, ...]) -> int: + return len(result) + + def _get_result_item_count(self, result: Tuple[BlockHeader, ...]) -> int: + return len(result) diff --git a/trinity/utils/logging.py b/trinity/utils/logging.py index 8c1a12b57c..f2725737dd 100644 --- a/trinity/utils/logging.py +++ b/trinity/utils/logging.py @@ -40,6 +40,19 @@ LOG_MAX_MB = 5 +class HasTraceLogger: + _logger: TraceLogger = None + + @property + def logger(self) -> TraceLogger: + if self._logger is None: + self._logger = cast( + TraceLogger, + logging.getLogger(self.__module__ + '.' + self.__class__.__name__) + ) + return self._logger + + def setup_log_levels(log_levels: Dict[Union[None, str], int]) -> None: for name, level in log_levels.items(): logger = logging.getLogger(name)