Skip to content

Commit

Permalink
Ping first if conn is idle for too long (#365)
Browse files Browse the repository at this point in the history
* Ping first if conn is idle for too long

The blocking sockets don't have active connection_lost() events, if the
user is trying to execute a non-read-only query over a disconnected
connection, is_closed() cannot capture the disconnection and the retry
loop wouldn't work. In this case, we are sending a SYNC before the
query, so that we could capture the connection error and reconnect.

This is tailored to handle the server-side `session_idle_timeout`; for
regular network interruptions within the `session_idle_timeout`, we
don't want to sacrifice performance by inserting more frequent SYNCs
just to avoid connection errors on non-read-only queries.

* Also fixing racy blocking proto disconnect
  • Loading branch information
fantix committed Oct 21, 2022
1 parent 33a912c commit 99cf78a
Show file tree
Hide file tree
Showing 3 changed files with 36 additions and 5 deletions.
29 changes: 28 additions & 1 deletion edgedb/blocking_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
#


import datetime
import queue
import socket
import ssl
Expand All @@ -32,8 +33,12 @@
from .protocol import blocking_proto


DEFAULT_PING_BEFORE_IDLE_TIMEOUT = datetime.timedelta(seconds=5)
MINIMUM_PING_WAIT_TIME = datetime.timedelta(seconds=1)


class BlockingIOConnection(base_client.BaseConnection):
__slots__ = ()
__slots__ = ("_ping_wait_time",)

async def connect_addr(self, addr, timeout):
deadline = time.monotonic() + timeout
Expand Down Expand Up @@ -97,6 +102,16 @@ async def connect_addr(self, addr, timeout):

self._protocol = proto
self._addr = addr
self._ping_wait_time = max(
(
getattr(
self.get_settings().get("system_config"),
"session_idle_timeout",
)
- DEFAULT_PING_BEFORE_IDLE_TIMEOUT
),
MINIMUM_PING_WAIT_TIME,
).total_seconds()

except Exception:
sock.close()
Expand Down Expand Up @@ -131,6 +146,18 @@ def _dispatch_log_message(self, msg):
for cb in self._log_listeners:
cb(self, msg)

async def raw_query(self, query_context: abstract.QueryContext):
try:
if (
time.monotonic() - self._protocol.last_active_timestamp
> self._ping_wait_time
):
await self._protocol._sync()
except errors.ClientConnectionError:
await self.connect()

return await super().raw_query(query_context)


class _PoolConnectionHolder(base_client.PoolConnectionHolder):
__slots__ = ()
Expand Down
1 change: 1 addition & 0 deletions edgedb/protocol/blocking_proto.pxd
Original file line number Diff line number Diff line change
Expand Up @@ -27,5 +27,6 @@ cdef class BlockingIOProtocol(protocol.SansIOProtocolBackwardsCompatible):
cdef:
readonly object sock
float deadline
readonly object last_active_timestamp

cdef _disconnect(self)
11 changes: 7 additions & 4 deletions edgedb/protocol/blocking_proto.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -45,13 +45,13 @@ cdef class BlockingIOProtocol(protocol.SansIOProtocolBackwardsCompatible):

cdef _disconnect(self):
self.connected = False
if self.sock is not None:
sock, self.sock = self.sock, None
if sock is not None:
try:
self.sock.shutdown(socket.SHUT_RDWR)
sock.shutdown(socket.SHUT_RDWR)
except OSError:
pass
self.sock.close()
self.sock = None
sock.close()

cdef write(self, WriteBuffer buf):
try:
Expand Down Expand Up @@ -93,6 +93,7 @@ cdef class BlockingIOProtocol(protocol.SansIOProtocolBackwardsCompatible):
self._disconnect()
raise errors.ClientConnectionClosedError()
self.buffer.feed_data(data)
self.last_active_timestamp = time.monotonic()

async def try_recv_eagerly(self):
if self.buffer.take_message():
Expand All @@ -112,6 +113,8 @@ cdef class BlockingIOProtocol(protocol.SansIOProtocolBackwardsCompatible):
except OSError as e:
self._disconnect()
raise con_utils.wrap_error(e) from e
else:
self.last_active_timestamp = time.monotonic()
finally:
self.sock.settimeout(None)

Expand Down

0 comments on commit 99cf78a

Please sign in to comment.