forked from libp2p/py-libp2p
-
Notifications
You must be signed in to change notification settings - Fork 0
/
ping.py
130 lines (105 loc) 路 3.88 KB
/
ping.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
import logging
import math
import secrets
import time
from typing import Union
import trio
from libp2p.exceptions import ValidationError
from libp2p.host.host_interface import IHost
from libp2p.network.stream.exceptions import StreamClosed, StreamEOF, StreamReset
from libp2p.network.stream.net_stream_interface import INetStream
from libp2p.peer.id import ID as PeerID
from libp2p.typing import TProtocol
ID = TProtocol("/ipfs/ping/1.0.0")
PING_LENGTH = 32
RESP_TIMEOUT = 60
logger = logging.getLogger("libp2p.host.ping")
async def handle_ping(stream: INetStream) -> None:
"""``handle_ping`` responds to incoming ping requests until one side errors
or closes the ``stream``."""
peer_id = stream.muxed_conn.peer_id
while True:
try:
should_continue = await _handle_ping(stream, peer_id)
if not should_continue:
return
except Exception:
await stream.reset()
return
async def _handle_ping(stream: INetStream, peer_id: PeerID) -> bool:
"""Return a boolean indicating if we expect more pings from the peer at
``peer_id``."""
try:
with trio.fail_after(RESP_TIMEOUT):
payload = await stream.read(PING_LENGTH)
except trio.TooSlowError as error:
logger.debug("Timed out waiting for ping from %s: %s", peer_id, error)
raise
except StreamEOF:
logger.debug("Other side closed while waiting for ping from %s", peer_id)
return False
except StreamReset as error:
logger.debug(
"Other side reset while waiting for ping from %s: %s", peer_id, error
)
raise
except Exception as error:
logger.debug("Error while waiting to read ping for %s: %s", peer_id, error)
raise
logger.debug("Received ping from %s with data: 0x%s", peer_id, payload.hex())
try:
await stream.write(payload)
except StreamClosed:
logger.debug("Fail to respond to ping from %s: stream closed", peer_id)
raise
return True
class PingService:
"""PingService executes pings and returns RTT in miliseconds."""
def __init__(self, host: IHost):
self._host = host
async def ping(self, peer_id: PeerID) -> int:
stream = await self._host.new_stream(peer_id, (ID,))
try:
rtt = await _ping(stream)
await _close_stream(stream)
return rtt
except Exception:
await _close_stream(stream)
raise
async def ping_loop(
self, peer_id: PeerID, ping_amount: Union[int, float] = math.inf
) -> "PingIterator":
stream = await self._host.new_stream(peer_id, (ID,))
ping_iterator = PingIterator(stream, ping_amount)
return ping_iterator
class PingIterator:
def __init__(self, stream: INetStream, ping_amount: Union[int, float]):
self._stream = stream
self._ping_limit = ping_amount
self._ping_counter = 0
def __aiter__(self) -> "PingIterator":
return self
async def __anext__(self) -> int:
if self._ping_counter > self._ping_limit:
await _close_stream(self._stream)
raise StopAsyncIteration
self._ping_counter += 1
try:
return await _ping(self._stream)
except trio.EndOfChannel:
await _close_stream(self._stream)
raise StopAsyncIteration
async def _ping(stream: INetStream) -> int:
ping_bytes = secrets.token_bytes(PING_LENGTH)
before = int(time.time() * 10 ** 6) # convert float of seconds to int miliseconds
await stream.write(ping_bytes)
pong_bytes = await stream.read(PING_LENGTH)
rtt = int(time.time() * 10 ** 6) - before
if ping_bytes != pong_bytes:
raise ValidationError("Invalid PING response")
return rtt
async def _close_stream(stream: INetStream) -> None:
try:
await stream.close()
except Exception:
pass