Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Tweak dropped connection detection #185

Merged
merged 11 commits into from Oct 17, 2020
4 changes: 2 additions & 2 deletions httpcore/_async/connection.py
Expand Up @@ -165,8 +165,8 @@ def state(self) -> ConnectionState:
return ConnectionState.PENDING
return self.connection.get_state()

def is_connection_dropped(self) -> bool:
return self.connection is not None and self.connection.is_connection_dropped()
def is_socket_readable(self) -> bool:
return self.connection is not None and self.connection.is_socket_readable()

def mark_as_ready(self) -> None:
if self.connection is not None:
Expand Down
9 changes: 8 additions & 1 deletion httpcore/_async/connection_pool.py
Expand Up @@ -245,7 +245,14 @@ async def _get_connection_from_pool(
seen_http11 = True

if connection.state == ConnectionState.IDLE:
if connection.is_connection_dropped():
if connection.is_socket_readable():
# If the socket is readable while the connection is idle (meaning
# we don't expect the server to send any data), then the only valid
# reason is that the other end has disconnected, which means we
# should drop the connection too.
# (For a detailed run-through of what a "readable" socket is, and
# why this is the best thing for us to do here, see:
# https://github.com/encode/httpx/pull/143#issuecomment-515181778)
logger.trace("removing dropped idle connection=%r", connection)
# IDLE connections that have been dropped should be
# removed from the pool.
Expand Down
4 changes: 2 additions & 2 deletions httpcore/_async/http.py
Expand Up @@ -20,9 +20,9 @@ def mark_as_ready(self) -> None:
"""
raise NotImplementedError() # pragma: nocover

def is_connection_dropped(self) -> bool:
def is_socket_readable(self) -> bool:
"""
Return 'True' if the connection has been dropped by the remote end.
Return 'True' if the underlying network socket is readable.
"""
raise NotImplementedError() # pragma: nocover

Expand Down
4 changes: 2 additions & 2 deletions httpcore/_async/http11.py
Expand Up @@ -201,5 +201,5 @@ async def aclose(self) -> None:

await self.socket.aclose()

def is_connection_dropped(self) -> bool:
return self.socket.is_connection_dropped()
def is_socket_readable(self) -> bool:
return self.socket.is_readable()
4 changes: 2 additions & 2 deletions httpcore/_async/http2.py
Expand Up @@ -158,8 +158,8 @@ async def send_connection_init(self, timeout: TimeoutDict) -> None:
def is_closed(self) -> bool:
return False

def is_connection_dropped(self) -> bool:
return self.socket.is_connection_dropped()
def is_socket_readable(self) -> bool:
return self.socket.is_readable()

async def aclose(self) -> None:
logger.trace("close_connection=%r", self)
Expand Down
9 changes: 4 additions & 5 deletions httpcore/_backends/anyio.py
@@ -1,4 +1,3 @@
import select
from ssl import SSLContext
from typing import Optional

Expand All @@ -17,6 +16,7 @@
WriteTimeout,
)
from .._types import TimeoutDict
from .._utils import is_socket_readable
from .base import AsyncBackend, AsyncLock, AsyncSemaphore, AsyncSocketStream


Expand Down Expand Up @@ -85,10 +85,9 @@ async def aclose(self) -> None:
except BrokenResourceError as exc:
raise CloseError from exc

def is_connection_dropped(self) -> bool:
raw_socket = self.stream.extra(SocketAttribute.raw_socket)
rready, _wready, _xready = select.select([raw_socket], [], [], 0)
return bool(rready)
def is_readable(self) -> bool:
sock = self.stream.extra(SocketAttribute.raw_socket)
return is_socket_readable(sock.fileno())


class Lock(AsyncLock):
Expand Down
21 changes: 6 additions & 15 deletions httpcore/_backends/asyncio.py
@@ -1,4 +1,5 @@
import asyncio
import socket
from ssl import SSLContext
from typing import Optional

Expand All @@ -13,6 +14,7 @@
map_exceptions,
)
from .._types import TimeoutDict
from .._utils import is_socket_readable
from .base import AsyncBackend, AsyncLock, AsyncSemaphore, AsyncSocketStream

SSL_MONKEY_PATCH_APPLIED = False
Expand Down Expand Up @@ -171,21 +173,10 @@ async def aclose(self) -> None:
with map_exceptions({OSError: CloseError}):
self.stream_writer.close()

def is_connection_dropped(self) -> bool:
# Counter-intuitively, what we really want to know here is whether the socket is
# *readable*, i.e. whether it would return immediately with empty bytes if we
# called `.recv()` on it, indicating that the other end has closed the socket.
# See: https://github.com/encode/httpx/pull/143#issuecomment-515181778
#
# As it turns out, asyncio checks for readability in the background
# (see: https://github.com/encode/httpx/pull/276#discussion_r322000402),
# so checking for EOF or readability here would yield the same result.
#
# At the cost of rigour, we check for EOF instead of readability because asyncio
# does not expose any public API to check for readability.
# (For a solution that uses private asyncio APIs, see:
# https://github.com/encode/httpx/pull/143#issuecomment-515202982)
return self.stream_reader.at_eof()
def is_readable(self) -> bool:
transport = self.stream_reader._transport # type: ignore
sock: socket.socket = transport.get_extra_info("socket")
return is_socket_readable(sock.fileno())


class Lock(AsyncLock):
Expand Down
2 changes: 1 addition & 1 deletion httpcore/_backends/base.py
Expand Up @@ -63,7 +63,7 @@ async def write(self, data: bytes, timeout: TimeoutDict) -> None:
async def aclose(self) -> None:
raise NotImplementedError() # pragma: no cover

def is_connection_dropped(self) -> bool:
def is_readable(self) -> bool:
raise NotImplementedError() # pragma: no cover


Expand Down
9 changes: 3 additions & 6 deletions httpcore/_backends/curio.py
@@ -1,4 +1,3 @@
import select
from ssl import SSLContext, SSLSocket
from typing import Optional

Expand All @@ -15,7 +14,7 @@
map_exceptions,
)
from .._types import TimeoutDict
from .._utils import get_logger
from .._utils import get_logger, is_socket_readable
from .base import AsyncBackend, AsyncLock, AsyncSemaphore, AsyncSocketStream

logger = get_logger(__name__)
Expand Down Expand Up @@ -132,10 +131,8 @@ async def aclose(self) -> None:
await self.stream.close()
await self.socket.close()

def is_connection_dropped(self) -> bool:
rready, _, _ = select.select([self.socket.fileno()], [], [], 0)

return bool(rready)
def is_readable(self) -> bool:
return is_socket_readable(self.socket.fileno())


class CurioBackend(AsyncBackend):
Expand Down
7 changes: 3 additions & 4 deletions httpcore/_backends/sync.py
@@ -1,4 +1,3 @@
import select
import socket
import threading
import time
Expand All @@ -17,6 +16,7 @@
map_exceptions,
)
from .._types import TimeoutDict
from .._utils import is_socket_readable


class SyncSocketStream:
Expand Down Expand Up @@ -77,9 +77,8 @@ def close(self) -> None:
with map_exceptions({socket.error: CloseError}):
self.sock.close()

def is_connection_dropped(self) -> bool:
rready, _wready, _xready = select.select([self.sock], [], [], 0)
return bool(rready)
def is_readable(self) -> bool:
return is_socket_readable(self.sock.fileno())


class SyncLock:
Expand Down
6 changes: 1 addition & 5 deletions httpcore/_backends/trio.py
Expand Up @@ -82,7 +82,7 @@ async def aclose(self) -> None:
with map_exceptions({trio.BrokenResourceError: CloseError}):
await self.stream.aclose()

def is_connection_dropped(self) -> bool:
def is_readable(self) -> bool:
# Adapted from: https://github.com/encode/httpx/pull/143#issuecomment-515202982
stream = self.stream

Expand All @@ -91,10 +91,6 @@ def is_connection_dropped(self) -> bool:
stream = stream.transport_stream
assert isinstance(stream, trio.SocketStream)

# Counter-intuitively, what we really want to know here is whether the socket is
# *readable*, i.e. whether it would return immediately with empty bytes if we
# called `.recv()` on it, indicating that the other end has closed the socket.
# See: https://github.com/encode/httpx/pull/143#issuecomment-515181778
return stream.socket.is_readable()


Expand Down
4 changes: 2 additions & 2 deletions httpcore/_sync/connection.py
Expand Up @@ -165,8 +165,8 @@ def state(self) -> ConnectionState:
return ConnectionState.PENDING
return self.connection.get_state()

def is_connection_dropped(self) -> bool:
return self.connection is not None and self.connection.is_connection_dropped()
def is_socket_readable(self) -> bool:
return self.connection is not None and self.connection.is_socket_readable()

def mark_as_ready(self) -> None:
if self.connection is not None:
Expand Down
9 changes: 8 additions & 1 deletion httpcore/_sync/connection_pool.py
Expand Up @@ -245,7 +245,14 @@ def _get_connection_from_pool(
seen_http11 = True

if connection.state == ConnectionState.IDLE:
if connection.is_connection_dropped():
if connection.is_socket_readable():
# If the socket is readable while the connection is idle (meaning
# we don't expect the server to send any data), then the only valid
# reason is that the other end has disconnected, which means we
# should drop the connection too.
# (For a detailed run-through of what a "readable" socket is, and
# why this is the best thing for us to do here, see:
# https://github.com/encode/httpx/pull/143#issuecomment-515181778)
logger.trace("removing dropped idle connection=%r", connection)
# IDLE connections that have been dropped should be
# removed from the pool.
Expand Down
4 changes: 2 additions & 2 deletions httpcore/_sync/http.py
Expand Up @@ -20,9 +20,9 @@ def mark_as_ready(self) -> None:
"""
raise NotImplementedError() # pragma: nocover

def is_connection_dropped(self) -> bool:
def is_socket_readable(self) -> bool:
"""
Return 'True' if the connection has been dropped by the remote end.
Return 'True' if the underlying network socket is readable.
"""
raise NotImplementedError() # pragma: nocover

Expand Down
4 changes: 2 additions & 2 deletions httpcore/_sync/http11.py
Expand Up @@ -201,5 +201,5 @@ def close(self) -> None:

self.socket.close()

def is_connection_dropped(self) -> bool:
return self.socket.is_connection_dropped()
def is_socket_readable(self) -> bool:
return self.socket.is_readable()
4 changes: 2 additions & 2 deletions httpcore/_sync/http2.py
Expand Up @@ -158,8 +158,8 @@ def send_connection_init(self, timeout: TimeoutDict) -> None:
def is_closed(self) -> bool:
return False

def is_connection_dropped(self) -> bool:
return self.socket.is_connection_dropped()
def is_socket_readable(self) -> bool:
return self.socket.is_readable()

def close(self) -> None:
logger.trace("close_connection=%r", self)
Expand Down
24 changes: 24 additions & 0 deletions httpcore/_utils.py
@@ -1,6 +1,7 @@
import itertools
import logging
import os
import select
import sys
import typing

Expand Down Expand Up @@ -70,3 +71,26 @@ def exponential_backoff(factor: float) -> typing.Iterator[float]:
yield 0
for n in itertools.count(2):
yield factor * (2 ** (n - 2))


def is_socket_readable(sock_fd: int) -> bool:
"""
Return whether a socket, as identifed by its file descriptor, is readable.

"A socket is readable" means that the read buffer isn't empty, i.e. that calling
.recv() on it would immediately return some data.
"""
# NOTE: we want check for readability without actually attempting to read, because
# we don't want to block forever if it's not readable.

# The implementation below was stolen from:
# https://github.com/python-trio/trio/blob/20ee2b1b7376db637435d80e266212a35837ddcc/trio/_socket.py#L471-L478
# See also: https://github.com/encode/httpcore/pull/193#issuecomment-703129316

# Use select.select on Windows, and select.poll everywhere else
if sys.platform == "win32":
rready, _, _ = select.select([sock_fd], [], [], 0)
return bool(rready)
p = select.poll()
p.register(sock_fd, select.POLLIN)
return bool(p.poll(0))
2 changes: 1 addition & 1 deletion tests/async_tests/test_connection_pool.py
Expand Up @@ -49,7 +49,7 @@ def info(self) -> str:
def mark_as_ready(self) -> None:
self.state = ConnectionState.READY

def is_connection_dropped(self) -> bool:
def is_socket_readable(self) -> bool:
return False


Expand Down
31 changes: 30 additions & 1 deletion tests/async_tests/test_interfaces.py
Expand Up @@ -156,7 +156,7 @@ async def test_http_request_cannot_reuse_dropped_connection(

# Mock the connection as having been dropped.
connection = list(http._connections[url[:3]])[0] # type: ignore
connection.is_connection_dropped = lambda: True # type: ignore
connection.is_socket_readable = lambda: True # type: ignore

method = b"GET"
url = (b"http", *server.netloc, b"/")
Expand Down Expand Up @@ -361,3 +361,32 @@ async def test_explicit_backend_name(server: Server) -> None:
assert status_code == 200
assert ext == {"http_version": "HTTP/1.1", "reason": "OK"}
assert len(http._connections[url[:3]]) == 1 # type: ignore


@pytest.mark.anyio
@pytest.mark.usefixtures("too_many_open_files_minus_one")
@pytest.mark.skipif(platform.system() != "Linux", reason="Only a problem on Linux")
async def test_broken_socket_detection_many_open_files(
backend: str, server: Server
) -> None:
"""
Regression test for: https://github.com/encode/httpcore/issues/182
"""
async with httpcore.AsyncConnectionPool(backend=backend) as http:
method = b"GET"
url = (b"http", *server.netloc, b"/")
headers = [server.host_header]

# * First attempt will be successful because it will grab the last
# available fd before what select() supports on the platform.
# * Second attempt would have failed without a fix, due to a "filedescriptor
# out of range in select()" exception.
for _ in range(2):
status_code, response_headers, stream, ext = await http.arequest(
method, url, headers
)
await read_body(stream)

assert status_code == 200
assert ext == {"http_version": "HTTP/1.1", "reason": "OK"}
assert len(http._connections[url[:3]]) == 1 # type: ignore
26 changes: 26 additions & 0 deletions tests/conftest.py
Expand Up @@ -73,3 +73,29 @@ def server() -> Server:
@pytest.fixture(scope="session")
def https_server() -> Server:
return Server(SERVER_HOST, port=443)


@pytest.fixture(scope="function")
def too_many_open_files_minus_one() -> typing.Iterator[None]:
# Fixture for test regression on https://github.com/encode/httpcore/issues/182
# Max number of descriptors chosen according to:
# See: https://man7.org/linux/man-pages/man2/select.2.html#top_of_page
# "To monitor file descriptors greater than 1023, use poll or epoll instead."
max_num_descriptors = 1023

files = []

while True:
f = open("/dev/null")
# Leave one file descriptor available for a transport to perform
# a successful request.
if f.fileno() > max_num_descriptors - 1:
f.close()
break
files.append(f)

try:
yield
finally:
for f in files:
f.close()
2 changes: 1 addition & 1 deletion tests/sync_tests/test_connection_pool.py
Expand Up @@ -49,7 +49,7 @@ def info(self) -> str:
def mark_as_ready(self) -> None:
self.state = ConnectionState.READY

def is_connection_dropped(self) -> bool:
def is_socket_readable(self) -> bool:
return False


Expand Down