Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ensure we don't loop trying to write to a channel thats not connected (fix 100% CPU) #419

Closed
wants to merge 21 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -9,3 +9,4 @@ build/
coverage.xml
docs/_themes
docs/_build
.venv/
14 changes: 9 additions & 5 deletions src/waitress/channel.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ class HTTPChannel(wasyncore.dispatcher):
last_activity = 0 # Time of last activity
will_close = False # set to True to close the socket.
close_when_flushed = False # set to True to close the socket when flushed
closed = False # set to True when closed not just due to being disconnected at the start
sent_continue = False # used as a latch after sending 100 continue
total_outbufs_len = 0 # total bytes ready to send
current_outbuf_count = 0 # total bytes written to current outbuf
Expand All @@ -67,6 +68,9 @@ def __init__(self, server, sock, addr, adj, map=None):
self.outbuf_lock = threading.Condition()

wasyncore.dispatcher.__init__(self, sock, map=map)
if not self.connected:
# Sometimes can be closed quickly and getpeername fails.
self.handle_close()
Copy link
Author

@djay djay Sep 19, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@digitalresistor @d-mauer I'm still not sure on this fix. I think I read somewhere how windows can sometimes fail on getpeername?
The other fix will still prevent the looping bug by letting it write and error out. This one will close it before it wastes the app time if indeed the connection really is closed

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If getpeername() fails on Windows then it would get self.connected set to False anyway, this would cause the bug. So trying to keep going after getpeername() failed is not sustainable.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

connect = False doesn't cause the bug alone. It also needs the request to be malformed as that prevents both reading and writing and maintainance from cleaning it up. So in someways the real bug is in handle_write

if not self.connected:
    return

The test i put in shows that in the most likely scenario that makes this occur it's trying to close the channel but is prevented from doing so by the above line.

The more I think about it @d-maurer is correct that this should be changed to self.closed or something that explicitly prevents a close from happening twice. Thats the safest minimal change.


# Don't let wasyncore.dispatcher throttle self.addr on us.
self.addr = addr
Expand All @@ -86,15 +90,15 @@ def writable(self):
# the channel (possibly by our server maintenance logic), run
# handle_write

return self.total_outbufs_len or self.will_close or self.close_when_flushed
return (self.total_outbufs_len or self.will_close or self.close_when_flushed)

def handle_write(self):
# Precondition: there's data in the out buffer to be sent, or
# there's a pending will_close request

if not self.connected:
# we dont want to close the channel twice

if self.closed:
# we dont want to close the channel twice.
# but we need let the channel close if it's marked to close
return

# try to flush any pending output
Expand Down Expand Up @@ -150,7 +154,6 @@ def readable(self):
# 3. There are not too many tasks already queued
# 4. There's no data in the output buffer that needs to be sent
# before we potentially create a new task.

return not (
self.will_close
or self.close_when_flushed
Expand Down Expand Up @@ -314,6 +317,7 @@ def handle_close(self):
self.total_outbufs_len = 0
self.connected = False
self.outbuf_lock.notify()
self.closed = True
wasyncore.dispatcher.close(self)

def add_channel(self, map=None):
Expand Down
21 changes: 18 additions & 3 deletions tests/test_channel.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
from errno import EINVAL
import io
import socket
import unittest

import pytest
Expand All @@ -11,10 +13,10 @@ def _makeOne(self, sock, addr, adj, map=None):
server = DummyServer()
return HTTPChannel(server, sock, addr, adj=adj, map=map)

def _makeOneWithMap(self, adj=None):
def _makeOneWithMap(self, adj=None, sock_shutdown=False):
if adj is None:
adj = DummyAdjustments()
sock = DummySock()
sock = DummySock(shutdown=sock_shutdown)
map = {}
inst = self._makeOne(sock, "127.0.0.1", adj, map=map)
inst.outbuf_lock = DummyLock()
Expand Down Expand Up @@ -65,8 +67,18 @@ def test_writable_nothing_in_outbuf_will_close(self):
def test_handle_write_not_connected(self):
inst, sock, map = self._makeOneWithMap()
inst.connected = False
# TODO: handle_write never returns anything anyway
self.assertFalse(inst.handle_write())

def test_handle_write_not_connected_but_will_close(self):
inst, sock, map = self._makeOneWithMap()
inst.connected = False
inst.will_close = True
# https://github.com/Pylons/waitress/issues/418
# Ensure we actually handle_close even if not connected
self.assertFalse(inst.handle_write())
self.assertEqual(len(map), 0)

def test_handle_write_with_requests(self):
inst, sock, map = self._makeOneWithMap()
inst.requests = True
Expand Down Expand Up @@ -906,8 +918,9 @@ class DummySock:
blocking = False
closed = False

def __init__(self):
def __init__(self, shutdown=False):
self.sent = b""
self.shutdown = shutdown

def setblocking(self, *arg):
self.blocking = True
Expand All @@ -916,6 +929,8 @@ def fileno(self):
return 100

def getpeername(self):
if self.shutdown:
raise OSError(EINVAL)
return "127.0.0.1"

def getsockopt(self, level, option):
Expand Down
156 changes: 156 additions & 0 deletions tests/test_server.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,12 @@
import errno
import select
import socket
import struct
from threading import Event
from time import sleep
import time
import unittest
from waitress.channel import HTTPChannel

dummy_app = object()

Expand Down Expand Up @@ -311,6 +317,98 @@ def test_create_with_one_socket_handle_accept_noerror(self):
self.assertEqual(innersock.opts, [("level", "optname", "value")])
self.assertEqual(L, [(inst, innersock, None, inst.adj)])

def test_error_request_quick_shutdown(self):
""" Issue found in production that led to 100% useage because getpeername failed after accept but before channel setup.
"""
client = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sockets = [socket.socket(socket.AF_INET, socket.SOCK_STREAM)]
sockets[0].setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 0)
sockets[0].bind(("127.0.0.1", 8000))
sockets[0].listen()
inst = self._makeWithSockets(_start=False, sockets=sockets)
channels = []
inst.channel_class = make_quick_shutdown_channel(client, channels)
inst.task_dispatcher = DummyTaskDispatcher()

# This will make getpeername fail fast with EINVAL OSError
client.setsockopt(socket.SOL_SOCKET, socket.SO_LINGER, struct.pack('ii', 1, 0))
client.connect(("127.0.0.1", 8000))
client.send(b"1") # Send our fake request before we accept and close the connection
inst.handle_accept() # ShutdownServer will close the connection after acceot but before getpeername
self.assertRaises(OSError, sockets[0].getpeername)
self.assertFalse(channels[0].connected, "race condition means our socket is marked not connected")
self.assertNotIn(channels[0], inst._map.values(), "we should get an automatic close")

def test_error_request_no_loop(self):
""" Issue found in production that led to 100% useage because getpeername failed after accept but before channel setup.
"""
client = socket.socket(socket.AF_INET, socket.SOCK_STREAM)

sockets = [socket.socket(socket.AF_INET, socket.SOCK_STREAM)]
sockets[0].setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 0)
sockets[0].bind(("127.0.0.1", 8000))
sockets[0].listen()
inst = self._makeWithSockets(_start=False, sockets=sockets)
channels = []
inst.channel_class = make_quick_shutdown_channel(client, channels, shutdown=False)
inst.task_dispatcher = DummyTaskDispatcher()

# This will make getpeername fail fast with EINVAL OSError
client.setsockopt(socket.SOL_SOCKET, socket.SO_LINGER, struct.pack('ii', 1, 0))
client.connect(("127.0.0.1", 8000))
client.send(b"1") # Send our fake request before we accept and close the connection
inst.handle_accept() # ShutdownServer will close the connection after acceot but before getpeername
self.assertRaises(OSError, sockets[0].getpeername)
self.assertEquals(len(channels), 1)
channels[0].connected = False # This used to create a 100% CPU loop

server_run_for_count(inst, 1) # Read the request
self.assertTrue(channels[0].requests[0].error, "for this bug we need the request to have a parsing error")
server_run_for_count(inst, 5)
# simulate thread processing the request
channels[0].service()
self.assertTrue(channels[0].close_when_flushed, "This prevents reads (which lead to close) and loops on handle_write (with 100% CPU)")
server_run_for_count(inst, 5) # Our loop
self.assertNotIn(channels[0], inst._map.values(), "broken request didn't close the channel")
self.assertEqual(channels[0].count_close, 1, "but also this connection never gets closed")
self.assertLess(channels[0].count_writes, 5, "We're supposed to be in a loop trying to write but can't")

def test_error_request_maintainace_cleanup(self):
""" Issue found in production that led to 100% useage because getpeername failed after accept but before channel setup.
"""
client = socket.socket(socket.AF_INET, socket.SOCK_STREAM)

sockets = [socket.socket(socket.AF_INET, socket.SOCK_STREAM)]
sockets[0].setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 0)
sockets[0].bind(("127.0.0.1", 8000))
sockets[0].listen()
inst = self._makeWithSockets(_start=False, sockets=sockets)
channels = []
inst.channel_class = make_quick_shutdown_channel(client, channels, shutdown=False)
inst.task_dispatcher = DummyTaskDispatcher()

# This will make getpeername fail fast with EINVAL OSError
client.setsockopt(socket.SOL_SOCKET, socket.SO_LINGER, struct.pack('ii', 1, 0))
client.connect(("127.0.0.1", 8000))
client.send(b"1") # Send our fake request before we accept and close the connection
inst.handle_accept() # ShutdownServer will close the connection after acceot but before getpeername
self.assertRaises(OSError, sockets[0].getpeername)
self.assertNotEqual(channels, [])
channels[0].connected = False ## race condition means our socket is marked not connected

server_run_for_count(inst, 1) # Read the request
# self.assertTrue(channels[0].requests[0].error, "for this bug we need the request to have a parsing error")
server_run_for_count(inst, 5)
channels[0].service()
self.assertTrue(channels[0].close_when_flushed, "This prevents reads (which lead to close) and loops on handle_write (with 100% CPU)")
server_run_for_count(inst, 5) # Our loop
channels[0].last_activity = 0
inst.maintenance(1000)
self.assertEqual(channels[0].will_close, 1, "maintenance will try to close it")
self.assertNotIn(channels[0], inst._map.values(), "broken request didn't close the channel")
server_run_for_count(inst, 5) # Our loop
self.assertNotEqual(channels[0].count_writes, 10, "But we still get our loop")


if hasattr(socket, "AF_UNIX"):

Expand Down Expand Up @@ -516,3 +614,61 @@ def __init__(self):

def warning(self, msg, **kw):
self.logged.append(msg)


class ErrorRequest:
error = True # We are simulating a header parsing error
version = 1
data = None
completed = True
empty = False
headers_finished = True
expect_continue = False
retval = None
connection_close = False

def __init__(self, adj):
pass

def received(self, data):
self.data = data
if self.retval is not None:
return self.retval
return len(data)

def close(self):
pass


def make_quick_shutdown_channel(client, channels, shutdown=True):
class ShutdownChannel(HTTPChannel):
parser_class = ErrorRequest

def __init__(self, server, sock, addr, adj, map=None):
self.count_writes = self.count_close = self.count_wouldblock = 0
if shutdown:
client.close()
channels.append(self)
return HTTPChannel.__init__(self, server, sock, addr, adj, map)

def handle_write(self):
self.count_writes += 1
return HTTPChannel.handle_write(self)

def handle_close(self):
# import pdb; pdb.set_trace()
self.count_close += 1
return HTTPChannel.handle_close(self)

return ShutdownChannel


def server_run_for_count(inst, count=1):
# Modified server run to prevent infinite loop
inst.asyncore.loop(
timeout=inst.adj.asyncore_loop_timeout,
map=inst._map,
use_poll=inst.adj.asyncore_use_poll,
count=count
)

Loading