Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

outbuf_high_watermark #242

Merged
merged 6 commits into from Apr 6, 2019
Merged
@@ -11,6 +11,12 @@ Deprecations
Features
~~~~~~~~

- Add a new ``outbuf_high_watermark`` adjustment which is used to apply
backpressure on the ``app_iter`` to avoid letting it spin faster than data
can be written to the socket. This stabilizes responses that iterate quickly
with a lot of data.
See https://github.com/Pylons/waitress/pull/242

- Stop early and close the ``app_iter`` when attempting to write to a closed
socket due to a client disconnect. This should notify a long-lived streaming
response when a client hangs up.
@@ -43,6 +49,11 @@ Bugfixes
the server will die before benefiting from these changes.
See https://github.com/Pylons/waitress/pull/245

- Fix a bug in which a streaming ``app_iter`` may never cleanup data that has
already been sent. This would cause buffers in waitress to grow without
bounds. These buffers now properly rotate and release their data.
See https://github.com/Pylons/waitress/pull/242

1.2.1 (2019-01-25)
------------------

@@ -5,6 +5,6 @@

.. module:: waitress

.. function:: serve(app, listen='0.0.0.0:8080', unix_socket=None, unix_socket_perms='600', threads=4, url_scheme='http', url_prefix='', ident='waitress', backlog=1204, recv_bytes=8192, send_bytes=1, outbuf_overflow=104856, inbuf_overflow=52488, connection_limit=1000, cleanup_interval=30, channel_timeout=120, log_socket_errors=True, max_request_header_size=262144, max_request_body_size=1073741824, expose_tracebacks=False)
.. function:: serve(app, listen='0.0.0.0:8080', unix_socket=None, unix_socket_perms='600', threads=4, url_scheme='http', url_prefix='', ident='waitress', backlog=1204, recv_bytes=8192, send_bytes=1, outbuf_overflow=104856, outbuf_high_watermark=16777216, inbuf_overflow=52488, connection_limit=1000, cleanup_interval=30, channel_timeout=120, log_socket_errors=True, max_request_header_size=262144, max_request_body_size=1073741824, expose_tracebacks=False)

See :ref:`arguments` for more information.
@@ -200,6 +200,13 @@ outbuf_overflow

Default: ``1048576`` (1MB)

outbuf_high_watermark
The app_iter will pause when pending output is larger than this value
and will resume once enough data is written to the socket to fall below
this threshold.

Default: ``16777216`` (16MB)

inbuf_overflow
A tempfile should be created if the pending input is larger than
inbuf_overflow, which is measured in bytes. The default is conservative.
@@ -152,6 +152,11 @@ Tuning options:
A temporary file should be created if the pending output is larger than
this. Default is 1048576 (1MB).

``--outbuf-high-watermark=INT``
The app_iter will pause when pending output is larger than this value
and will resume once enough data is written to the socket to fall below
this threshold. Default is 16777216 (16MB).

``--inbuf-overflow=INT``
A temporary file should be created if the pending input is larger than
this. Default is 524288 (512KB).
@@ -112,6 +112,7 @@ class Adjustments(object):
('recv_bytes', int),
('send_bytes', int),
('outbuf_overflow', int),
('outbuf_high_watermark', int),
('inbuf_overflow', int),
('connection_limit', int),
('cleanup_interval', int),
@@ -204,6 +205,10 @@ class Adjustments(object):
# is conservative.
outbuf_overflow = 1048576

# The app_iter will pause when pending output is larger than this value
# in bytes.
outbuf_high_watermark = 16777216

# A tempfile should be created if the pending input is larger than
# inbuf_overflow, which is measured in bytes. The default is 512K. This
# is conservative.
@@ -53,6 +53,8 @@ class HTTPChannel(wasyncore.dispatcher, object):
close_when_flushed = False # set to True to close the socket when flushed
requests = () # currently pending requests
sent_continue = False # used as a latch after sending 100 continue
total_outbufs_len = 0 # total bytes ready to send
current_outbuf_count = 0 # total bytes written to current outbuf

#
# ASYNCHRONOUS METHODS (including __init__)
@@ -69,14 +71,13 @@ def __init__(
self.server = server
self.adj = adj
self.outbufs = [OverflowableBuffer(adj.outbuf_overflow)]
self.total_outbufs_len = 0
self.creation_time = self.last_activity = time.time()
self.sendbuf_len = sock.getsockopt(socket.SOL_SOCKET, socket.SO_SNDBUF)

# task_lock used to push/pop requests
self.task_lock = threading.Lock()
# outbuf_lock used to access any outbuf
self.outbuf_lock = threading.RLock()
# outbuf_lock used to access any outbuf (expected to use an RLock)
self.outbuf_lock = threading.Condition()

wasyncore.dispatcher.__init__(self, sock, map=map)

@@ -122,7 +123,7 @@ def handle_write(self):
if self.adj.log_socket_errors:
self.logger.exception('Socket error')
self.will_close = True
except:
except Exception:
self.logger.exception('Unexpected exception when flushing')
self.will_close = True

@@ -177,6 +178,7 @@ def received(self, data):
# lock the outbuf to append to it.
outbuf_payload = b'HTTP/1.1 100 Continue\r\n\r\n'
self.outbufs[-1].append(outbuf_payload)
self.current_outbuf_count += len(outbuf_payload)
self.total_outbufs_len += len(outbuf_payload)
self.sent_continue = True
self._flush_some()
@@ -205,6 +207,9 @@ def _flush_some_if_lockable(self):
if self.outbuf_lock.acquire(False):
try:
self._flush_some()

if self.total_outbufs_len < self.adj.outbuf_high_watermark:
self.outbuf_lock.notify()
finally:
self.outbuf_lock.release()

@@ -217,23 +222,8 @@ def _flush_some(self):
while True:
outbuf = self.outbufs[0]
# use outbuf.__len__ rather than len(outbuf) FBO of not getting
# OverflowError on Python 2
# OverflowError on 32-bit Python
outbuflen = outbuf.__len__()
if outbuflen <= 0:
# self.outbufs[-1] must always be a writable outbuf
if len(self.outbufs) > 1:
toclose = self.outbufs.pop(0)
try:
toclose.close()
except:
self.logger.exception(
'Unexpected error when closing an outbuf')
continue # pragma: no cover (coverage bug, it is hit)
else:
if hasattr(outbuf, 'prune'):
outbuf.prune()
dobreak = True

while outbuflen > 0:
chunk = outbuf.get(self.sendbuf_len)
num_sent = self.send(chunk)
@@ -243,8 +233,21 @@ def _flush_some(self):
sent += num_sent
self.total_outbufs_len -= num_sent
else:
# failed to write anything, break out entirely
dobreak = True
break
else:
# self.outbufs[-1] must always be a writable outbuf
if len(self.outbufs) > 1:
toclose = self.outbufs.pop(0)
try:
toclose.close()
except Exception:
self.logger.exception(
'Unexpected error when closing an outbuf')
else:
# caught up, done flushing for now
dobreak = True

if dobreak:
break
@@ -260,11 +263,12 @@ def handle_close(self):
for outbuf in self.outbufs:
try:
outbuf.close()
except:
except Exception:
self.logger.exception(
'Unknown exception while trying to close outbuf')
self.total_outbufs_len = 0
self.connected = False
self.outbuf_lock.notify()
wasyncore.dispatcher.close(self)

def add_channel(self, map=None):
@@ -299,18 +303,25 @@ def write_soon(self, data):
# the async mainloop might be popping data off outbuf; we can
# block here waiting for it because we're in a task thread
with self.outbuf_lock:
# check again after acquiring the lock to ensure we the
# outbufs are not closed
if not self.connected: # pragma: no cover
self._flush_outbufs_below_high_watermark()
if not self.connected:
raise ClientDisconnected
if data.__class__ is ReadOnlyFileBasedBuffer:
# they used wsgi.file_wrapper
self.outbufs.append(data)
nextbuf = OverflowableBuffer(self.adj.outbuf_overflow)
self.outbufs.append(nextbuf)
self.current_outbuf_count = 0
else:
# if we overflowed then start a new buffer to ensure avoid
This conversation was marked as resolved by mmerickel

This comment has been minimized.

Copy link
@bertjwregeer

bertjwregeer Apr 6, 2019

Member

This sentence needs a rewrite.

"If we overflowed we start a new buffer to avoid this buffer growing unbounded"

For instance

# this buffer growing unbounded
if self.current_outbuf_count > self.adj.outbuf_high_watermark:
nextbuf = OverflowableBuffer(self.adj.outbuf_overflow)
This conversation was marked as resolved by bertjwregeer

This comment has been minimized.

Copy link
@bertjwregeer

bertjwregeer Apr 6, 2019

Member

Each time we do this, we start back over with a new memory based buffer, until we overflow onto disk, is this something we should worry about?

This comment has been minimized.

Copy link
@mmerickel

mmerickel Apr 6, 2019

Author Member

You tell me, seems fine imo. We could always use a file once we start a file for a channel but I don't see the point I guess without some benchmarks.

self.outbufs.append(nextbuf)
self.current_outbuf_count = 0
self.outbufs[-1].append(data)
num_bytes = len(data)
self.current_outbuf_count += num_bytes
self.total_outbufs_len += num_bytes
# XXX We might eventually need to pull the trigger here (to
# instruct select to stop blocking), but it slows things down so
@@ -319,6 +330,16 @@ def write_soon(self, data):
return num_bytes
return 0

def _flush_outbufs_below_high_watermark(self):
# check first to avoid locking if possible
if self.total_outbufs_len > self.adj.outbuf_high_watermark:
with self.outbuf_lock:
while (
self.connected and
self.total_outbufs_len > self.adj.outbuf_high_watermark
):
self.outbuf_lock.wait()

def service(self):
"""Execute all pending requests """
with self.task_lock:
@@ -334,7 +355,7 @@ def service(self):
self.logger.info('Client disconnected while serving %s' %
task.request.path)
task.close_on_finish = True
except:
except Exception:
self.logger.exception('Exception while serving %s' %
task.request.path)
if not task.wrote_header:
@@ -370,6 +391,15 @@ def service(self):
request.close()
self.requests = []
else:
# before processing a new request, ensure there is not too
# much data in the outbufs waiting to be flushed
# NB: currently readable() returns False while we are
# flushing data so we know no new requests will come in
# that we need to account for, otherwise it'd be better
# to do this check at the start of the request instead of
# at the end to account for consecutive service() calls
if len(self.requests) > 1:

This comment has been minimized.

Copy link
@bertjwregeer

bertjwregeer Apr 6, 2019

Member

Would it make sense here to kick the trigger?

This comment has been minimized.

Copy link
@mmerickel

mmerickel Apr 6, 2019

Author Member

I'm not familiar with the scenarios in which kicking the trigger is required.

This comment has been minimized.

Copy link
@mmerickel

mmerickel Apr 6, 2019

Author Member

It's not like the current behavior is worse than it was before. I would assume the situation is that the write returned 0 bytes written because the buffer was full, and the system would know that more data is writable before hitting a select timeout or something. If you think it's a problem then it should be kicked in _flush_outbufs_below_high_watermark which affects both app_iter writes as well as here between requests. I can't imagine the circumstances are different here than there.

self._flush_outbufs_below_high_watermark()
request = self.requests.pop(0)
request.close()

@@ -126,6 +126,11 @@
A temporary file should be created if the pending output is larger
than this. Default is 1048576 (1MB).
--outbuf-high-watermark=INT
The app_iter will pause when pending output is larger than this value
and will resume once enough data is written to the socket to fall below
this threshold. Default is 16777216 (16MB).
--inbuf-overflow=INT
A temporary file should be created if the pending input is larger
than this. Default is 524288 (512KB).
@@ -228,6 +228,74 @@ def test_write_soon_disconnected(self):
inst.connected = False
self.assertRaises(ClientDisconnected, lambda: inst.write_soon(b'stuff'))

def test_write_soon_disconnected_while_over_watermark(self):
from waitress.channel import ClientDisconnected
inst, sock, map = self._makeOneWithMap()
def dummy_flush():
inst.connected = False
inst._flush_outbufs_below_high_watermark = dummy_flush
self.assertRaises(ClientDisconnected, lambda: inst.write_soon(b'stuff'))

def test_write_soon_rotates_outbuf_on_overflow(self):
inst, sock, map = self._makeOneWithMap()
inst.adj.outbuf_high_watermark = 3
inst.current_outbuf_count = 4
wrote = inst.write_soon(b'xyz')
self.assertEqual(wrote, 3)
self.assertEqual(len(inst.outbufs), 2)
self.assertEqual(inst.outbufs[0].get(), b'')
self.assertEqual(inst.outbufs[1].get(), b'xyz')

def test_write_soon_waits_on_backpressure(self):
inst, sock, map = self._makeOneWithMap()
inst.adj.outbuf_high_watermark = 3
inst.total_outbufs_len = 4
inst.current_outbuf_count = 4
class Lock(DummyLock):
def wait(self):
inst.total_outbufs_len = 0
super(Lock, self).wait()
inst.outbuf_lock = Lock()
wrote = inst.write_soon(b'xyz')
self.assertEqual(wrote, 3)
self.assertEqual(len(inst.outbufs), 2)
self.assertEqual(inst.outbufs[0].get(), b'')
self.assertEqual(inst.outbufs[1].get(), b'xyz')
self.assertTrue(inst.outbuf_lock.waited)

def test_handle_write_notify_after_flush(self):
inst, sock, map = self._makeOneWithMap()
inst.requests = [True]
inst.outbufs = [DummyBuffer(b'abc')]
inst.total_outbufs_len = len(inst.outbufs[0])
inst.adj.send_bytes = 1
inst.adj.outbuf_high_watermark = 5
inst.will_close = False
inst.last_activity = 0
result = inst.handle_write()
self.assertEqual(result, None)
self.assertEqual(inst.will_close, False)
self.assertTrue(inst.outbuf_lock.acquired)
self.assertTrue(inst.outbuf_lock.notified)
self.assertEqual(sock.sent, b'abc')

def test_handle_write_no_notify_after_flush(self):
inst, sock, map = self._makeOneWithMap()
inst.requests = [True]
inst.outbufs = [DummyBuffer(b'abc')]
inst.total_outbufs_len = len(inst.outbufs[0])
inst.adj.send_bytes = 1
inst.adj.outbuf_high_watermark = 2
sock.send = lambda x: False
inst.will_close = False
inst.last_activity = 0
result = inst.handle_write()
self.assertEqual(result, None)
self.assertEqual(inst.will_close, False)
self.assertTrue(inst.outbuf_lock.acquired)
self.assertFalse(inst.outbuf_lock.notified)
self.assertEqual(sock.sent, b'')

def test__flush_some_empty_outbuf(self):
inst, sock, map = self._makeOneWithMap()
result = inst._flush_some()
@@ -652,6 +720,7 @@ def send(self, data):
return len(data)

class DummyLock(object):
notified = False

def __init__(self, acquirable=True):
self.acquirable = acquirable
@@ -664,6 +733,12 @@ def acquire(self, val):
def release(self):
self.released = True

def notify(self):
self.notified = True

def wait(self):
self.waited = True

def __exit__(self, type, val, traceback):
self.acquire(True)

@@ -695,6 +770,7 @@ def close(self):

class DummyAdjustments(object):
outbuf_overflow = 1048576
outbuf_high_watermark = 1048576
inbuf_overflow = 512000
cleanup_interval = 900
url_scheme = 'http'
ProTip! Use n and p to navigate between commits in a pull request.
You can’t perform that action at this time.