Skip to content

Commit

Permalink
Expand peer stats tracking (#1225)
Browse files Browse the repository at this point in the history
* extract tracker concept from normalizer

* expand stats gathering on peer responses

* pr feedback
  • Loading branch information
pipermerriam committed Aug 31, 2018
1 parent 06355d0 commit 9ba2c1f
Show file tree
Hide file tree
Showing 18 changed files with 449 additions and 105 deletions.
14 changes: 9 additions & 5 deletions tests/trinity/core/p2p-proto/test_stats.py
Expand Up @@ -71,7 +71,7 @@ async def les_peer_and_remote(request, event_loop):
async def test_eth_get_headers_empty_stats(eth_peer_and_remote):
peer, remote = eth_peer_and_remote
stats = peer.requests.get_stats()
assert all(status == 'Uninitialized' for status in stats.values())
assert all(status == 'None' for status in stats.values())
assert 'BlockHeaders' in stats.keys()


Expand All @@ -90,8 +90,10 @@ async def send_headers():

stats = peer.requests.get_stats()

assert stats['BlockHeaders'].startswith('count={0}, items={0}, avg_rtt='.format(idx))
assert stats['BlockHeaders'].endswith(', timeouts=0')
assert stats['BlockHeaders'].startswith('msgs={0} items={0} rtt='.format(idx))
assert 'timeouts=0' in stats['BlockHeaders']
assert 'quality=' in stats['BlockHeaders']
assert 'ips=' in stats['BlockHeaders']


@pytest.mark.asyncio
Expand All @@ -113,5 +115,7 @@ async def test_les_get_headers_stats(les_peer_and_remote):

stats = peer.requests.get_stats()

assert stats['BlockHeaders'].startswith('count={0}, items={0}, avg_rtt='.format(idx))
assert stats['BlockHeaders'].endswith(', timeouts=0')
assert stats['BlockHeaders'].startswith('msgs={0} items={0} rtt='.format(idx))
assert 'timeouts=0' in stats['BlockHeaders']
assert 'quality=' in stats['BlockHeaders']
assert 'ips=' in stats['BlockHeaders']
2 changes: 2 additions & 0 deletions trinity/protocol/common/constants.py
@@ -0,0 +1,2 @@
# The default timeout for a round trip API request and response from a peer.
ROUND_TRIP_TIMEOUT = 20.0
15 changes: 12 additions & 3 deletions trinity/protocol/common/exchanges.py
Expand Up @@ -14,7 +14,12 @@
)

from trinity.utils.decorators import classproperty
from .managers import ExchangeManager
from .trackers import (
BasePerformanceTracker,
)
from .managers import (
ExchangeManager,
)
from .normalizers import BaseNormalizer
from .types import (
TResponsePayload,
Expand Down Expand Up @@ -42,17 +47,20 @@ class BaseExchange(ABC, Generic[TRequestPayload, TResponsePayload, TResult]):
"""

request_class: Type[BaseRequest[TRequestPayload]]
tracker_class: Type[BasePerformanceTracker[Any, TResult]]
tracker: BasePerformanceTracker[BaseRequest[TRequestPayload], TResult]

def __init__(self, mgr: ExchangeManager[TRequestPayload, TResponsePayload, TResult]) -> None:
self._manager = mgr
self.tracker = self.tracker_class()

async def get_result(
self,
request: BaseRequest[TRequestPayload],
normalizer: BaseNormalizer[TResponsePayload, TResult],
result_validator: BaseValidator[TResult],
payload_validator: Callable[[TRequestPayload, TResponsePayload], None],
timeout: int = None) -> TResult:
timeout: float = None) -> TResult:
"""
This is a light convenience wrapper around the ExchangeManager's get_result() method.
Expand All @@ -71,7 +79,8 @@ async def get_result(
normalizer,
result_validator.validate_result,
message_validator,
timeout=timeout
self.tracker,
timeout,
)

@classproperty
Expand Down
20 changes: 10 additions & 10 deletions trinity/protocol/common/handlers.py
@@ -1,8 +1,7 @@
from abc import abstractmethod
from abc import ABC, abstractmethod
from typing import (
Any,
Dict,
Set,
Type,
)

Expand All @@ -16,29 +15,30 @@
)


class BaseExchangeHandler:
_exchange_managers: Set[ExchangeManager[Any, Any, Any]]

class BaseExchangeHandler(ABC):
@property
@abstractmethod
def _exchanges(self) -> Dict[str, Type[BaseExchange[Any, Any, Any]]]:
def _exchange_config(self) -> Dict[str, Type[BaseExchange[Any, Any, Any]]]:
pass

def __init__(self, peer: BasePeer) -> None:
self._peer = peer
self._exchange_managers = set()

for attr, exchange_cls in self._exchanges.items():
for attr, exchange_cls in self._exchange_config.items():
if hasattr(self, attr):
raise AttributeError(
"Unable to set manager on attribute `{0}` which is already "
"present on the class: {1}".format(attr, getattr(self, attr))
)
manager: ExchangeManager[Any, Any, Any]
manager = ExchangeManager(self._peer, exchange_cls.response_cmd_type, peer.cancel_token)
self._exchange_managers.add(manager)
exchange = exchange_cls(manager)
setattr(self, attr, exchange)

def get_stats(self) -> Dict[str, str]:
return dict(exchange_manager.get_stats() for exchange_manager in self._exchange_managers)
exchanges = tuple(getattr(self, key) for key in self._exchange_config.keys())
return {
exchange.response_cmd_type.__name__: exchange.tracker.get_stats()
for exchange
in exchanges
}
74 changes: 25 additions & 49 deletions trinity/protocol/common/managers.py
@@ -1,6 +1,6 @@
import asyncio
import time
from typing import ( # noqa: F401 -- AsyncGenerator needed by mypy
from typing import (
Any,
AsyncGenerator,
Callable,
Expand Down Expand Up @@ -28,38 +28,15 @@

from trinity.exceptions import AlreadyWaiting

from .constants import ROUND_TRIP_TIMEOUT
from .normalizers import BaseNormalizer
from .trackers import BasePerformanceTracker
from .types import (
TResponsePayload,
TResult,
)


class ResponseTimeTracker:

def __init__(self) -> None:
self.total_msgs = 0
self.total_items = 0
self.total_timeouts = 0
self.total_response_time = 0.0

def get_stats(self) -> str:
if not self.total_msgs:
return 'None'
avg_rtt = self.total_response_time / self.total_msgs
if not self.total_items:
per_item_rtt = 0.0
else:
per_item_rtt = self.total_response_time / self.total_items
return 'count=%d, items=%d, avg_rtt=%.2f, avg_time_per_item=%.5f, timeouts=%d' % (
self.total_msgs, self.total_items, avg_rtt, per_item_rtt, self.total_timeouts)

def add(self, time: float, size: int) -> None:
self.total_msgs += 1
self.total_items += size
self.total_response_time += time


class ResponseCandidateStream(
PeerSubscriber,
BaseService,
Expand All @@ -74,7 +51,7 @@ def subscription_msg_types(self) -> Set[Type[Command]]:

msg_queue_maxsize = 100

response_timout: int = 20
response_timout: float = ROUND_TRIP_TIMEOUT

pending_request: Tuple[float, 'asyncio.Future[TResponsePayload]'] = None

Expand All @@ -87,13 +64,14 @@ def __init__(
token: CancelToken) -> None:
super().__init__(token)
self._peer = peer
self.response_times = ResponseTimeTracker()
self.response_msg_type = response_msg_type

async def payload_candidates(
self,
request: BaseRequest[TRequestPayload],
timeout: int = None) -> 'AsyncGenerator[TResponsePayload, None]':
tracker: BasePerformanceTracker[BaseRequest[TRequestPayload], Any],
*,
timeout: float = None) -> AsyncGenerator[TResponsePayload, None]:
"""
Make a request and iterate through candidates for a valid response.
Expand All @@ -105,15 +83,20 @@ async def payload_candidates(

self._request(request)
while self._is_pending():
yield await self._get_payload(timeout)
try:
yield await self._get_payload(timeout)
except TimeoutError:
tracker.record_timeout(timeout)
raise

@property
def response_msg_name(self) -> str:
return self.response_msg_type.__name__

def complete_request(self, item_count: int) -> None:
def complete_request(self) -> None:
if self.pending_request is None:
self.logger.warning("`complete_request` was called when there was no pending request")
self.pending_request = None
self.response_times.add(self.last_response_time, item_count)

#
# Service API
Expand Down Expand Up @@ -143,13 +126,10 @@ async def _handle_msg(self, msg: TResponsePayload) -> None:
self.last_response_time = time.perf_counter() - send_time
future.set_result(msg)

async def _get_payload(self, timeout: int) -> TResponsePayload:
async def _get_payload(self, timeout: float) -> TResponsePayload:
send_time, future = self.pending_request
try:
payload = await self.wait(future, timeout=timeout)
except TimeoutError:
self.response_times.total_timeouts += 1
raise
finally:
self.pending_request = None

Expand Down Expand Up @@ -192,9 +172,6 @@ def deregister_peer(self, peer: BasePeer) -> None:
_, future = self.pending_request
future.set_exception(PeerConnectionLost("Pending request can't complete: peer is gone"))

def get_stats(self) -> Tuple[str, str]:
return (self.response_msg_name, self.response_times.get_stats())

def __repr__(self) -> str:
return f'<ResponseCandidateStream({self._peer!s}, {self.response_msg_type!r})>'

Expand Down Expand Up @@ -230,14 +207,15 @@ async def get_result(
normalizer: BaseNormalizer[TResponsePayload, TResult],
validate_result: Callable[[TResult], None],
payload_validator: Callable[[TResponsePayload], None],
timeout: int = None) -> TResult:
tracker: BasePerformanceTracker[BaseRequest[TRequestPayload], TResult],
timeout: float = None) -> TResult:

if not self.is_operational:
raise ValidationError("You must call `launch_service` before initiating a peer request")

stream = self._response_stream

async for payload in stream.payload_candidates(request, timeout):
async for payload in stream.payload_candidates(request, tracker, timeout=timeout):
try:
payload_validator(payload)

Expand All @@ -256,8 +234,12 @@ async def get_result(
)
continue
else:
num_items = normalizer.get_num_results(result)
stream.complete_request(num_items)
tracker.record_response(
stream.last_response_time,
request,
result,
)
stream.complete_request()
return result

raise ValidationError("Manager is not pending a response, but no valid response received")
Expand All @@ -268,9 +250,3 @@ def service(self) -> BaseService:
This service that needs to be running for calls to execute properly
"""
return self._response_stream

def get_stats(self) -> Tuple[str, str]:
if self._response_stream is None:
return (self._response_command_type.__name__, 'Uninitialized')
else:
return self._response_stream.get_stats()
12 changes: 0 additions & 12 deletions trinity/protocol/common/normalizers.py
Expand Up @@ -28,14 +28,6 @@ def normalize_result(message: TResponsePayload) -> TResult:
"""
raise NotImplementedError()

@staticmethod
@abstractmethod
def get_num_results(result: TResult) -> int:
"""
Count the number of items returned in the result.
"""
raise NotImplementedError()


TPassthrough = TypeVar('TPassthrough', bound=PayloadType)

Expand All @@ -44,7 +36,3 @@ class NoopNormalizer(BaseNormalizer[TPassthrough, TPassthrough]):
@staticmethod
def normalize_result(message: TPassthrough) -> TPassthrough:
return message

@staticmethod
def get_num_results(result: TPassthrough) -> int:
return len(result)

0 comments on commit 9ba2c1f

Please sign in to comment.