Skip to content

Commit

Permalink
Merge pull request #337 from liamstask/connections-iter-lock
Browse files Browse the repository at this point in the history
connections: add threadsafe selector wrapper
  • Loading branch information
jaraco committed Nov 24, 2020
2 parents adb2b80 + 0b63913 commit c5ade2b
Show file tree
Hide file tree
Showing 5 changed files with 132 additions and 63 deletions.
7 changes: 7 additions & 0 deletions CHANGES.rst
Original file line number Diff line number Diff line change
@@ -1,3 +1,10 @@
.. scm-version-title:: v8.4.8

- :issue:`317` via :pr:`337`: Fixed a regression in
8.4.5 where the connections dictionary would change
size during iteration, leading to a RuntimeError raised
in the logs -- by :user:`liamstask`.

.. scm-version-title:: v8.4.7

- :pr:`334`: Started filtering out TLS/SSL errors when
Expand Down
139 changes: 101 additions & 38 deletions cheroot/connections.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import io
import os
import socket
import threading
import time

from . import errors
Expand Down Expand Up @@ -49,6 +50,68 @@ def prevent_socket_inheritance(sock):
fcntl.fcntl(fd, fcntl.F_SETFD, old_flags | fcntl.FD_CLOEXEC)


class _ThreadsafeSelector:
"""Thread-safe wrapper around a DefaultSelector.
There are 2 thread contexts in which it may be accessed:
* the selector thread
* one of the worker threads in workers/threadpool.py
The expected read/write patterns are:
* :py:meth:`iter`: selector thread
* :py:meth:`register`: selector thread and threadpool, via put()
* :py:meth:`unregister`: selector thread only
Notably, this means :py:class:`_ThreadsafeSelector` never needs to worry
that connections will be removed behind its back.
The lock is held when iterating or modifying the selector but is not
required when :py:meth:`~selectors.BaseSelector.select`-ing on it.
"""

def __init__(self):
self._selector = selectors.DefaultSelector()
self._lock = threading.Lock()

def __len__(self):
with self._lock:
return len(self._selector.get_map() or {})

@property
def connections(self):
"""Retrieve connections registered with the selector."""
with self._lock:
mapping = self._selector.get_map() or {}
for _, (_, sock_fd, _, conn) in mapping.items():
yield (sock_fd, conn)

def register(self, fileobj, events, data=None):
"""Register ``fileobj`` with the selector."""
with self._lock:
return self._selector.register(fileobj, events, data)

def unregister(self, fileobj):
"""Unregister ``fileobj`` from the selector."""
with self._lock:
return self._selector.unregister(fileobj)

def select(self, timeout=None):
"""Return socket fd and data pairs from selectors.select call.
Returns entries ready to read in the form:
(socket_file_descriptor, connection)
"""
return (
(key.fd, key.data)
for key, _ in self._selector.select(timeout=timeout)
)

def close(self):
"""Close the selector."""
with self._lock:
self._selector.close()


class ConnectionManager:
"""Class which manages HTTPConnection objects.
Expand All @@ -64,7 +127,7 @@ def __init__(self, server):
"""
self.server = server
self._readable_conns = collections.deque()
self._selector = selectors.DefaultSelector()
self._selector = _ThreadsafeSelector()

self._selector.register(
server.socket.fileno(),
Expand Down Expand Up @@ -100,15 +163,14 @@ def expire(self):
threshold = time.time() - self.server.timeout
timed_out_connections = [
(sock_fd, conn)
for _, (_, sock_fd, _, conn)
in self._selector.get_map().items()
for (sock_fd, conn) in self._selector.connections
if conn != self.server and conn.last_used < threshold
]
for sock_fd, conn in timed_out_connections:
self._selector.unregister(sock_fd)
conn.close()

def get_conn(self): # noqa: C901 # FIXME
def get_conn(self):
"""Return a HTTPConnection object which is ready to be handled.
A connection returned by this method should be ready for a worker
Expand All @@ -130,47 +192,53 @@ def get_conn(self): # noqa: C901 # FIXME
# The timeout value impacts performance and should be carefully
# chosen. Ref:
# github.com/cherrypy/cheroot/issues/305#issuecomment-663985165
rlist = [
key for key, _
in self._selector.select(timeout=0.01)
]
active_list = self._selector.select(timeout=0.01)
except OSError:
# Mark any connection which no longer appears valid
invalid_entries = []
for _, key in self._selector.get_map().items():
# If the server socket is invalid, we'll just ignore it and
# wait to be shutdown.
if key.data == self.server:
continue

try:
os.fstat(key.fd)
except OSError:
invalid_entries.append((key.fd, key.data))

for sock_fd, conn in invalid_entries:
self._selector.unregister(sock_fd)
conn.close()

self._remove_invalid_sockets()
# Wait for the next tick to occur.
return None

for key in rlist:
if key.data is self.server:
for (sock_fd, conn) in active_list:
if conn is self.server:
# New connection
return self._from_server_socket(self.server.socket)

conn = key.data
# unregister connection from the selector until the server
# has read from it and returned it via put()
self._selector.unregister(key.fd)
self._selector.unregister(sock_fd)
self._readable_conns.append(conn)

try:
return self._readable_conns.popleft()
except IndexError:
return None

def _remove_invalid_sockets(self):
"""Clean up the resources of any broken connections.
This method attempts to detect any connections in an invalid state,
unregisters them from the selector and closes the file descriptors of
the corresponding network sockets where possible.
"""
invalid_conns = []
for sock_fd, conn in self._selector.connections:
if conn is self.server:
continue

try:
os.fstat(sock_fd)
except OSError:
invalid_conns.append((sock_fd, conn))

for sock_fd, conn in invalid_conns:
self._selector.unregister(sock_fd)
# One of the reason on why a socket could cause an error
# is that the socket is already closed, ignore the
# socket error if we try to close it at this point.
# This is equivalent to OSError in Py3
with suppress(socket.error):
conn.close()

def _from_server_socket(self, server_socket): # noqa: C901 # FIXME
try:
s, addr = server_socket.accept()
Expand Down Expand Up @@ -267,14 +335,9 @@ def close(self):
conn.close()
self._readable_conns.clear()

connections = (
conn.data.socket
for _, conn in (self._selector.get_map() or {}).items()
if conn.data != self.server # server closes its own socket
)
for connection in connections:
connection.close()

for (_, conn) in self._selector.connections:
if conn is not self.server: # server closes its own socket
conn.close()
self._selector.close()

@property
Expand All @@ -285,7 +348,7 @@ def _num_connections(self):
minus one for the server socket, which is always registered
with the selector.
"""
return len(self._readable_conns) + len(self._selector.get_map()) - 1
return len(self._readable_conns) + len(self._selector) - 1

@property
def can_add_keepalive_connection(self):
Expand Down
29 changes: 17 additions & 12 deletions cheroot/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@
import platform
import contextlib
import threading
import errno

try:
from functools import lru_cache
Expand Down Expand Up @@ -1354,6 +1355,9 @@ def close(self):

if not self.linger:
self._close_kernel_socket()
# close the socket file descriptor
# (will be closed in the OS if there is no
# other reference to the underlying socket)
self.socket.close()
else:
# On the other hand, sometimes we want to hang around for a bit
Expand Down Expand Up @@ -1465,18 +1469,19 @@ def peer_group(self):
return group

def _close_kernel_socket(self):
"""Close kernel socket in outdated Python versions.
On old Python versions,
Python's socket module does NOT call close on the kernel
socket when you call socket.close(). We do so manually here
because we want this server to send a FIN TCP segment
immediately. Note this must be called *before* calling
socket.close(), because the latter drops its reference to
the kernel socket.
"""
if six.PY2 and hasattr(self.socket, '_sock'):
self.socket._sock.close()
"""Terminate the connection at the transport level."""
# Honor ``sock_shutdown`` for PyOpenSSL connections.
shutdown = getattr(
self.socket, 'sock_shutdown',
self.socket.shutdown,
)

try:
shutdown(socket.SHUT_RDWR) # actually send a TCP FIN
except socket.error as e:
# Suppress "client is no longer connected"
if e.errno != errno.ENOTCONN:
raise


class HTTPServer:
Expand Down
19 changes: 6 additions & 13 deletions cheroot/test/test_conn.py
Original file line number Diff line number Diff line change
Expand Up @@ -1092,19 +1092,19 @@ def __init__(self, original_get_map):
"""Initilize helper class to wrap the selector.get_map method."""
self.original_get_map = original_get_map
self.sabotage_conn = False
self.socket_closed = False
self.conn_closed = False

def __call__(self):
"""Intercept the calls to selector.get_map."""
sabotage_targets = (
conn for _, (_, _, _, conn) in self.original_get_map().items()
if isinstance(conn, cheroot.server.HTTPConnection)
) if self.sabotage_conn else ()
) if self.sabotage_conn and not self.conn_closed else ()

for conn in sabotage_targets:
# close the socket to cause OSError
conn.close()
self.socket_closed = True
self.conn_closed = True

return self.original_get_map()

Expand All @@ -1126,11 +1126,11 @@ def test_invalid_selected_connection(test_client, monkeypatch):

# patch the get_map method
faux_get_map = FaultyGetMap(
test_client.server_instance._connections._selector.get_map,
test_client.server_instance._connections._selector._selector.get_map,
)

monkeypatch.setattr(
test_client.server_instance._connections._selector,
test_client.server_instance._connections._selector._selector,
'get_map',
faux_get_map,
)
Expand All @@ -1147,11 +1147,4 @@ def test_invalid_selected_connection(test_client, monkeypatch):
# give time to make sure the error gets handled
time.sleep(0.2)
assert faux_select.os_error_triggered
assert faux_get_map.socket_closed
# any error in the error handling should be catched by the
# teardown verification for the error_log

if six.PY2:
test_client.server_instance.error_log.ignored_msgs.append(
'Error in HTTPServer.tick',
)
assert faux_get_map.conn_closed
1 change: 1 addition & 0 deletions docs/spelling_wordlist.txt
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ tuple
tuples
unbuffered
unclosed
unregister
uri
url
username
Expand Down

0 comments on commit c5ade2b

Please sign in to comment.