Skip to content

Commit

Permalink
Merge pull request #344 from carver/drop-old-sessions
Browse files Browse the repository at this point in the history
Drop old sessions
  • Loading branch information
carver committed May 13, 2021
2 parents 82b0e5d + 3717270 commit 13a5752
Show file tree
Hide file tree
Showing 6 changed files with 66 additions and 5 deletions.
24 changes: 23 additions & 1 deletion ddht/v5_1/abc.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
Any,
AsyncContextManager,
Collection,
Container,
NamedTuple,
Optional,
Protocol,
Expand Down Expand Up @@ -101,6 +102,27 @@ def is_timed_out(self) -> bool:
def timeout_at(self) -> float:
...

@property
@abstractmethod
def stale_at(self) -> float:
"""
At what (trio) time will the session be "stale"?
A session becomes stale when the other peer has not sent any message
for SESSION_IDLE_TIMEOUT.
"""
...

@property
@abstractmethod
def is_stale(self) -> bool:
"""
Is the current session stale?
See :meth:`~stale_at` for definition of stale.
"""
...

#
# Handshake Status
#
Expand Down Expand Up @@ -183,7 +205,7 @@ class EventsAPI(ABC):
topic_query_received: EventAPI[InboundMessage[TopicQueryMessage]]


class PoolAPI(ABC):
class PoolAPI(ABC, Container[uuid.UUID]):
local_private_key: keys.PrivateKey
local_node_id: NodeID

Expand Down
3 changes: 3 additions & 0 deletions ddht/v5_1/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,9 @@ async def run(self) -> None:
# Update the ENR if an explicit listening address was provided
enr_manager.update((IP_V4_ADDRESS_ENR_KEY, listen_on_ip_address.packed))

if listen_on_ip_address.is_loopback:
raise Exception("Cannot bind to localhost. Must choose a different IP")

listen_on = Endpoint(listen_on_ip_address.packed, self._boot_info.port)

if self._boot_info.is_upnp_enabled:
Expand Down
25 changes: 21 additions & 4 deletions ddht/v5_1/dispatcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -279,16 +279,33 @@ async def _monitor_session_timeout(self, session: SessionAPI) -> None:
"""
Monitor for the session to timeout, removing it from the pool.
"""
with trio.move_on_after(SESSION_IDLE_TIMEOUT) as scope:
await session.await_handshake_completion()

if scope.cancelled_caught:
try:
with trio.fail_after(SESSION_IDLE_TIMEOUT):
await session.await_handshake_completion()
except trio.TooSlowError:
try:
self._pool.remove_session(session.id)
except SessionNotFound:
pass
else:
await self._events.session_timeout.trigger(session)
return

while True:
await trio.sleep(SESSION_IDLE_TIMEOUT)

if session.id not in self._pool:
break
elif session.is_stale:
try:
self._pool.remove_session(session.id)
except SessionNotFound:
pass
else:
self.logger.debug("Dropping %s: reason=stale", session)
await self._events.session_timeout.trigger(session)
finally:
break

def _get_sessions_for_inbound_envelope(
self, envelope: InboundEnvelope
Expand Down
3 changes: 3 additions & 0 deletions ddht/v5_1/pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,9 @@ def _evict_session(self, key: uuid.UUID, value: SessionAPI) -> None:
# callback to remove LRU evicted session from _sessions_by_endpoint
self._sessions_by_endpoint[value.remote_endpoint].remove(value)

def __contains__(self, session_id: object) -> bool:
return session_id in self._sessions

def remove_session(self, session_id: uuid.UUID) -> SessionAPI:
try:
session = self._sessions[session_id]
Expand Down
13 changes: 13 additions & 0 deletions ddht/v5_1/session.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ class SessionStatus(enum.Enum):
class BaseSession(SessionAPI):
_remote_node_id: NodeID
_keys: SessionKeys
_last_message_received_at: Optional[float] = None

_handshake_scheme_registry: HandshakeSchemeRegistryAPI = v51_handshake_scheme_registry

Expand Down Expand Up @@ -142,6 +143,17 @@ def is_timed_out(self) -> bool:
def timeout_at(self) -> float:
return self.created_at + SESSION_IDLE_TIMEOUT

@property
def stale_at(self) -> float:
if self._last_message_received_at is None:
return self.created_at + SESSION_IDLE_TIMEOUT
else:
return self._last_message_received_at + SESSION_IDLE_TIMEOUT

@property
def is_stale(self) -> bool:
return self.stale_at <= trio.current_time()

@property
def local_enr(self) -> ENRAPI:
return self._enr_db.get_enr(self._local_node_id)
Expand Down Expand Up @@ -294,6 +306,7 @@ async def handle_inbound_envelope(self, envelope: InboundEnvelope) -> bool:
await self._events.packet_discarded.trigger((self, envelope))
return False
else:
self._last_message_received_at = trio.current_time()
await self._inbound_message_send_channel.send(
AnyInboundMessage(
message=message,
Expand Down
3 changes: 3 additions & 0 deletions newsfragments/344.feature.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
Drop peer session if no messages received for 60 seconds. Fail immediately if you try to launch with
127.0.0.1, because you can't send outbound messages away from your machine that way. (and get ugly
stack traces and a crash)

0 comments on commit 13a5752

Please sign in to comment.