From 610544cfd7374d0c0067ba1dec217bb7b4c9092d Mon Sep 17 00:00:00 2001 From: Michael Merickel Date: Thu, 28 Mar 2019 01:57:42 -0500 Subject: [PATCH 1/6] add backpressure on the app_iter if the socket can't send data quickly enough --- CHANGES.txt | 18 +++++++-- docs/api.rst | 2 +- docs/arguments.rst | 7 ++++ docs/runner.rst | 5 +++ waitress/adjustments.py | 5 +++ waitress/channel.py | 40 ++++++++++++++++--- waitress/runner.py | 5 +++ waitress/tests/test_channel.py | 71 ++++++++++++++++++++++++++++++++++ 8 files changed, 144 insertions(+), 9 deletions(-) diff --git a/CHANGES.txt b/CHANGES.txt index be245593..d5c6274f 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -11,6 +11,15 @@ Deprecations Features ~~~~~~~~ +- Add a new ``outbuf_high_watermark`` adjustment which is used to apply + backpressure on the ``app_iter`` to avoid letting it spin faster than data + can be written to the socket. This stabilizes responses that iterate quickly + with a lot of data. + See https://github.com/Pylons/waitress/pull/242 + +Bugfixes +~~~~~~~~ + - Stop early and close the ``app_iter`` when attempting to write to a closed socket due to a client disconnect. This should notify a long-lived streaming response when a client hangs up. @@ -26,9 +35,6 @@ Features still be flushed efficiently. See https://github.com/Pylons/waitress/pull/246 -Bugfixes -~~~~~~~~ - - When a client closes a socket unexpectedly there was potential for memory leaks in which data was written to the buffers after they were closed, causing them to reopen. @@ -43,6 +49,12 @@ Bugfixes the server will die before benefiting from these changes. See https://github.com/Pylons/waitress/pull/245 +- Fix a bug in which an ``app_iter`` that emits data chunks quicker than can + be written to a socket may never cleanup data that has already been sent. + This would cause buffers in waitress to grow without bounds. These buffers + now properly reclaim their space as data is written to the socket. + See https://github.com/Pylons/waitress/pull/242 + 1.2.1 (2019-01-25) ------------------ diff --git a/docs/api.rst b/docs/api.rst index 70c174cb..a921a1b2 100644 --- a/docs/api.rst +++ b/docs/api.rst @@ -5,6 +5,6 @@ .. module:: waitress -.. function:: serve(app, listen='0.0.0.0:8080', unix_socket=None, unix_socket_perms='600', threads=4, url_scheme='http', url_prefix='', ident='waitress', backlog=1204, recv_bytes=8192, send_bytes=1, outbuf_overflow=104856, inbuf_overflow=52488, connection_limit=1000, cleanup_interval=30, channel_timeout=120, log_socket_errors=True, max_request_header_size=262144, max_request_body_size=1073741824, expose_tracebacks=False) +.. function:: serve(app, listen='0.0.0.0:8080', unix_socket=None, unix_socket_perms='600', threads=4, url_scheme='http', url_prefix='', ident='waitress', backlog=1204, recv_bytes=8192, send_bytes=1, outbuf_overflow=104856, outbuf_high_watermark=16777216, inbuf_overflow=52488, connection_limit=1000, cleanup_interval=30, channel_timeout=120, log_socket_errors=True, max_request_header_size=262144, max_request_body_size=1073741824, expose_tracebacks=False) See :ref:`arguments` for more information. diff --git a/docs/arguments.rst b/docs/arguments.rst index 8bacc44a..22535a80 100644 --- a/docs/arguments.rst +++ b/docs/arguments.rst @@ -200,6 +200,13 @@ outbuf_overflow Default: ``1048576`` (1MB) +outbuf_high_watermark + The app_iter will pause when pending output is larger than this value + and will resume once enough data is written to the socket to fall below + this threshold. + + Default: ``16777216`` (16MB) + inbuf_overflow A tempfile should be created if the pending input is larger than inbuf_overflow, which is measured in bytes. The default is conservative. diff --git a/docs/runner.rst b/docs/runner.rst index 0b613074..2776e444 100644 --- a/docs/runner.rst +++ b/docs/runner.rst @@ -152,6 +152,11 @@ Tuning options: A temporary file should be created if the pending output is larger than this. Default is 1048576 (1MB). +``--outbuf-high-watermark=INT`` + The app_iter will pause when pending output is larger than this value + and will resume once enough data is written to the socket to fall below + this threshold. Default is 16777216 (16MB). + ``--inbuf-overflow=INT`` A temporary file should be created if the pending input is larger than this. Default is 524288 (512KB). diff --git a/waitress/adjustments.py b/waitress/adjustments.py index 3b0b3642..5c1879b9 100644 --- a/waitress/adjustments.py +++ b/waitress/adjustments.py @@ -112,6 +112,7 @@ class Adjustments(object): ('recv_bytes', int), ('send_bytes', int), ('outbuf_overflow', int), + ('outbuf_high_watermark', int), ('inbuf_overflow', int), ('connection_limit', int), ('cleanup_interval', int), @@ -204,6 +205,10 @@ class Adjustments(object): # is conservative. outbuf_overflow = 1048576 + # The app_iter will pause when pending output is larger than this value + # in bytes. + outbuf_high_watermark = 16777216 + # A tempfile should be created if the pending input is larger than # inbuf_overflow, which is measured in bytes. The default is 512K. This # is conservative. diff --git a/waitress/channel.py b/waitress/channel.py index 8fee59c5..298188a8 100644 --- a/waitress/channel.py +++ b/waitress/channel.py @@ -75,8 +75,8 @@ def __init__( # task_lock used to push/pop requests self.task_lock = threading.Lock() - # outbuf_lock used to access any outbuf - self.outbuf_lock = threading.RLock() + # outbuf_lock used to access any outbuf (expected to use an RLock) + self.outbuf_lock = threading.Condition() wasyncore.dispatcher.__init__(self, sock, map=map) @@ -205,6 +205,9 @@ def _flush_some_if_lockable(self): if self.outbuf_lock.acquire(False): try: self._flush_some() + + if self.total_outbufs_len < self.adj.outbuf_high_watermark: + self.outbuf_lock.notify() finally: self.outbuf_lock.release() @@ -265,6 +268,7 @@ def handle_close(self): 'Unknown exception while trying to close outbuf') self.total_outbufs_len = 0 self.connected = False + self.outbuf_lock.notify() wasyncore.dispatcher.close(self) def add_channel(self, map=None): @@ -299,9 +303,8 @@ def write_soon(self, data): # the async mainloop might be popping data off outbuf; we can # block here waiting for it because we're in a task thread with self.outbuf_lock: - # check again after acquiring the lock to ensure we the - # outbufs are not closed - if not self.connected: # pragma: no cover + overflowed = self._flush_outbufs_below_high_watermark() + if not self.connected: raise ClientDisconnected if data.__class__ is ReadOnlyFileBasedBuffer: # they used wsgi.file_wrapper @@ -309,6 +312,12 @@ def write_soon(self, data): nextbuf = OverflowableBuffer(self.adj.outbuf_overflow) self.outbufs.append(nextbuf) else: + # if we overflowed then start a new buffer to ensure + # the original eventually gets pruned otherwise it may + # grow unbounded + if overflowed: + nextbuf = OverflowableBuffer(self.adj.outbuf_overflow) + self.outbufs.append(nextbuf) self.outbufs[-1].append(data) num_bytes = len(data) self.total_outbufs_len += num_bytes @@ -319,6 +328,18 @@ def write_soon(self, data): return num_bytes return 0 + def _flush_outbufs_below_high_watermark(self): + overflowed = self.total_outbufs_len > self.adj.outbuf_high_watermark + # check first to avoid locking if possible + if overflowed: + with self.outbuf_lock: + while ( + self.connected and + self.total_outbufs_len > self.adj.outbuf_high_watermark + ): + self.outbuf_lock.wait() + return overflowed + def service(self): """Execute all pending requests """ with self.task_lock: @@ -370,6 +391,15 @@ def service(self): request.close() self.requests = [] else: + # before processing a new request, ensure there is not too + # much data in the outbufs waiting to be flushed + # NB: currently readable() returns False while we are + # flushing data so we know no new requests will come in + # that we need to account for, otherwise it'd be better + # to do this check at the start of the request instead of + # at the end to account for consecutive service() calls + if len(self.requests) > 1: + self._flush_outbufs_below_high_watermark() request = self.requests.pop(0) request.close() diff --git a/waitress/runner.py b/waitress/runner.py index abdb38e8..67972762 100644 --- a/waitress/runner.py +++ b/waitress/runner.py @@ -126,6 +126,11 @@ A temporary file should be created if the pending output is larger than this. Default is 1048576 (1MB). + --outbuf-high-watermark=INT + The app_iter will pause when pending output is larger than this value + and will resume once enough data is written to the socket to fall below + this threshold. Default is 16777216 (16MB). + --inbuf-overflow=INT A temporary file should be created if the pending input is larger than this. Default is 524288 (512KB). diff --git a/waitress/tests/test_channel.py b/waitress/tests/test_channel.py index f66766b9..ff3bba6e 100644 --- a/waitress/tests/test_channel.py +++ b/waitress/tests/test_channel.py @@ -228,6 +228,65 @@ def test_write_soon_disconnected(self): inst.connected = False self.assertRaises(ClientDisconnected, lambda: inst.write_soon(b'stuff')) + def test_write_soon_disconnected_while_over_watermark(self): + from waitress.channel import ClientDisconnected + inst, sock, map = self._makeOneWithMap() + def dummy_flush(): + inst.connected = False + inst._flush_outbufs_below_high_watermark = dummy_flush + self.assertRaises(ClientDisconnected, lambda: inst.write_soon(b'stuff')) + + def test_write_soon_rotates_outbuf_on_overflow(self): + inst, sock, map = self._makeOneWithMap() + inst.adj.outbuf_high_watermark = 3 + inst.outbufs = [DummyBuffer(b'abcd')] + inst.total_outbufs_len = sum(len(x) for x in inst.outbufs) + class Lock(DummyLock): + def wait(self): + inst.outbufs[0].prune() + inst.total_outbufs_len = sum(len(x) for x in inst.outbufs) + super(Lock, self).wait() + inst.outbuf_lock = Lock() + wrote = inst.write_soon(b'xyz') + self.assertEqual(wrote, 3) + self.assertEqual(len(inst.outbufs), 2) + self.assertTrue(inst.outbufs[0].pruned) + self.assertEqual(inst.outbufs[1].get(), b'xyz') + self.assertTrue(inst.outbuf_lock.waited) + + def test_handle_write_notify_after_flush(self): + inst, sock, map = self._makeOneWithMap() + inst.requests = [True] + inst.outbufs = [DummyBuffer(b'abc')] + inst.total_outbufs_len = len(inst.outbufs[0]) + inst.adj.send_bytes = 1 + inst.adj.outbuf_high_watermark = 5 + inst.will_close = False + inst.last_activity = 0 + result = inst.handle_write() + self.assertEqual(result, None) + self.assertEqual(inst.will_close, False) + self.assertTrue(inst.outbuf_lock.acquired) + self.assertTrue(inst.outbuf_lock.notified) + self.assertEqual(sock.sent, b'abc') + + def test_handle_write_no_notify_after_flush(self): + inst, sock, map = self._makeOneWithMap() + inst.requests = [True] + inst.outbufs = [DummyBuffer(b'abc')] + inst.total_outbufs_len = len(inst.outbufs[0]) + inst.adj.send_bytes = 1 + inst.adj.outbuf_high_watermark = 2 + sock.send = lambda x: False + inst.will_close = False + inst.last_activity = 0 + result = inst.handle_write() + self.assertEqual(result, None) + self.assertEqual(inst.will_close, False) + self.assertTrue(inst.outbuf_lock.acquired) + self.assertFalse(inst.outbuf_lock.notified) + self.assertEqual(sock.sent, b'') + def test__flush_some_empty_outbuf(self): inst, sock, map = self._makeOneWithMap() result = inst._flush_some() @@ -652,6 +711,7 @@ def send(self, data): return len(data) class DummyLock(object): + notified = False def __init__(self, acquirable=True): self.acquirable = acquirable @@ -664,6 +724,12 @@ def acquire(self, val): def release(self): self.released = True + def notify(self): + self.notified = True + + def wait(self): + self.waited = True + def __exit__(self, type, val, traceback): self.acquire(True) @@ -687,6 +753,10 @@ def get(self, *arg): def skip(self, num, x): self.skipped = num + def prune(self): + self.pruned = True + self.data = b'' + def __len__(self): return len(self.data) @@ -695,6 +765,7 @@ def close(self): class DummyAdjustments(object): outbuf_overflow = 1048576 + outbuf_high_watermark = 1048576 inbuf_overflow = 512000 cleanup_interval = 900 url_scheme = 'http' From b4df0057b599d84f8e55beaab55b9be6c0958ce7 Mon Sep 17 00:00:00 2001 From: Michael Merickel Date: Fri, 5 Apr 2019 13:00:38 -0500 Subject: [PATCH 2/6] rotate outbuf if we've written more than outbuf_high_watermark bytes to it --- waitress/channel.py | 45 +++++++++++++++++++++++++-------------------- 1 file changed, 25 insertions(+), 20 deletions(-) diff --git a/waitress/channel.py b/waitress/channel.py index 298188a8..2da4016e 100644 --- a/waitress/channel.py +++ b/waitress/channel.py @@ -53,6 +53,8 @@ class HTTPChannel(wasyncore.dispatcher, object): close_when_flushed = False # set to True to close the socket when flushed requests = () # currently pending requests sent_continue = False # used as a latch after sending 100 continue + total_outbufs_len = 0 # total bytes ready to send + current_outbuf_count = 0 # total bytes written to current outbuf # # ASYNCHRONOUS METHODS (including __init__) @@ -69,7 +71,6 @@ def __init__( self.server = server self.adj = adj self.outbufs = [OverflowableBuffer(adj.outbuf_overflow)] - self.total_outbufs_len = 0 self.creation_time = self.last_activity = time.time() self.sendbuf_len = sock.getsockopt(socket.SOL_SOCKET, socket.SO_SNDBUF) @@ -122,7 +123,7 @@ def handle_write(self): if self.adj.log_socket_errors: self.logger.exception('Socket error') self.will_close = True - except: + except Exception: self.logger.exception('Unexpected exception when flushing') self.will_close = True @@ -177,6 +178,7 @@ def received(self, data): # lock the outbuf to append to it. outbuf_payload = b'HTTP/1.1 100 Continue\r\n\r\n' self.outbufs[-1].append(outbuf_payload) + self.current_outbuf_count += len(outbuf_payload) self.total_outbufs_len += len(outbuf_payload) self.sent_continue = True self._flush_some() @@ -222,21 +224,6 @@ def _flush_some(self): # use outbuf.__len__ rather than len(outbuf) FBO of not getting # OverflowError on Python 2 outbuflen = outbuf.__len__() - if outbuflen <= 0: - # self.outbufs[-1] must always be a writable outbuf - if len(self.outbufs) > 1: - toclose = self.outbufs.pop(0) - try: - toclose.close() - except: - self.logger.exception( - 'Unexpected error when closing an outbuf') - continue # pragma: no cover (coverage bug, it is hit) - else: - if hasattr(outbuf, 'prune'): - outbuf.prune() - dobreak = True - while outbuflen > 0: chunk = outbuf.get(self.sendbuf_len) num_sent = self.send(chunk) @@ -246,8 +233,21 @@ def _flush_some(self): sent += num_sent self.total_outbufs_len -= num_sent else: + # failed to write anything, break out entirely dobreak = True break + else: + # self.outbufs[-1] must always be a writable outbuf + if len(self.outbufs) > 1: + toclose = self.outbufs.pop(0) + try: + toclose.close() + except Exception: + self.logger.exception( + 'Unexpected error when closing an outbuf') + else: + # caught up, done flushing for now + dobreak = True if dobreak: break @@ -263,7 +263,7 @@ def handle_close(self): for outbuf in self.outbufs: try: outbuf.close() - except: + except Exception: self.logger.exception( 'Unknown exception while trying to close outbuf') self.total_outbufs_len = 0 @@ -311,15 +311,20 @@ def write_soon(self, data): self.outbufs.append(data) nextbuf = OverflowableBuffer(self.adj.outbuf_overflow) self.outbufs.append(nextbuf) + self.current_outbuf_count = 0 else: # if we overflowed then start a new buffer to ensure # the original eventually gets pruned otherwise it may # grow unbounded - if overflowed: + if overflowed or ( + self.current_outbuf_count > self.adj.outbuf_high_watermark + ): nextbuf = OverflowableBuffer(self.adj.outbuf_overflow) self.outbufs.append(nextbuf) + self.current_outbuf_count = 0 self.outbufs[-1].append(data) num_bytes = len(data) + self.current_outbuf_count += num_bytes self.total_outbufs_len += num_bytes # XXX We might eventually need to pull the trigger here (to # instruct select to stop blocking), but it slows things down so @@ -355,7 +360,7 @@ def service(self): self.logger.info('Client disconnected while serving %s' % task.request.path) task.close_on_finish = True - except: + except Exception: self.logger.exception('Exception while serving %s' % task.request.path) if not task.wrote_header: From ecf13757fd585dbf7f2f9539663c69e261442c29 Mon Sep 17 00:00:00 2001 From: Bert JW Regeer Date: Fri, 5 Apr 2019 13:17:01 -0500 Subject: [PATCH 3/6] clarify comment on len() issues Co-Authored-By: mmerickel --- waitress/channel.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/waitress/channel.py b/waitress/channel.py index 2da4016e..27eb5321 100644 --- a/waitress/channel.py +++ b/waitress/channel.py @@ -222,7 +222,7 @@ def _flush_some(self): while True: outbuf = self.outbufs[0] # use outbuf.__len__ rather than len(outbuf) FBO of not getting - # OverflowError on Python 2 + # OverflowError on 32-bit Python outbuflen = outbuf.__len__() while outbuflen > 0: chunk = outbuf.get(self.sendbuf_len) From 291f8e196ccb4343e2539a55d480b0b5a76d1e87 Mon Sep 17 00:00:00 2001 From: Michael Merickel Date: Fri, 5 Apr 2019 13:19:13 -0500 Subject: [PATCH 4/6] simplify rotation --- waitress/channel.py | 17 ++++++----------- waitress/tests/test_channel.py | 23 ++++++++++++++--------- 2 files changed, 20 insertions(+), 20 deletions(-) diff --git a/waitress/channel.py b/waitress/channel.py index 27eb5321..6f207428 100644 --- a/waitress/channel.py +++ b/waitress/channel.py @@ -222,7 +222,7 @@ def _flush_some(self): while True: outbuf = self.outbufs[0] # use outbuf.__len__ rather than len(outbuf) FBO of not getting - # OverflowError on 32-bit Python + # OverflowError on 32-bit Python outbuflen = outbuf.__len__() while outbuflen > 0: chunk = outbuf.get(self.sendbuf_len) @@ -303,7 +303,7 @@ def write_soon(self, data): # the async mainloop might be popping data off outbuf; we can # block here waiting for it because we're in a task thread with self.outbuf_lock: - overflowed = self._flush_outbufs_below_high_watermark() + self._flush_outbufs_below_high_watermark() if not self.connected: raise ClientDisconnected if data.__class__ is ReadOnlyFileBasedBuffer: @@ -313,12 +313,9 @@ def write_soon(self, data): self.outbufs.append(nextbuf) self.current_outbuf_count = 0 else: - # if we overflowed then start a new buffer to ensure - # the original eventually gets pruned otherwise it may - # grow unbounded - if overflowed or ( - self.current_outbuf_count > self.adj.outbuf_high_watermark - ): + # if we overflowed then start a new buffer to ensure avoid + # this buffer growing unbounded + if self.current_outbuf_count > self.adj.outbuf_high_watermark: nextbuf = OverflowableBuffer(self.adj.outbuf_overflow) self.outbufs.append(nextbuf) self.current_outbuf_count = 0 @@ -334,16 +331,14 @@ def write_soon(self, data): return 0 def _flush_outbufs_below_high_watermark(self): - overflowed = self.total_outbufs_len > self.adj.outbuf_high_watermark # check first to avoid locking if possible - if overflowed: + if self.total_outbufs_len > self.adj.outbuf_high_watermark: with self.outbuf_lock: while ( self.connected and self.total_outbufs_len > self.adj.outbuf_high_watermark ): self.outbuf_lock.wait() - return overflowed def service(self): """Execute all pending requests """ diff --git a/waitress/tests/test_channel.py b/waitress/tests/test_channel.py index ff3bba6e..35fccf6e 100644 --- a/waitress/tests/test_channel.py +++ b/waitress/tests/test_channel.py @@ -239,18 +239,27 @@ def dummy_flush(): def test_write_soon_rotates_outbuf_on_overflow(self): inst, sock, map = self._makeOneWithMap() inst.adj.outbuf_high_watermark = 3 - inst.outbufs = [DummyBuffer(b'abcd')] - inst.total_outbufs_len = sum(len(x) for x in inst.outbufs) + inst.current_outbuf_count = 4 + wrote = inst.write_soon(b'xyz') + self.assertEqual(wrote, 3) + self.assertEqual(len(inst.outbufs), 2) + self.assertEqual(inst.outbufs[0].get(), b'') + self.assertEqual(inst.outbufs[1].get(), b'xyz') + + def test_write_soon_waits_on_backpressure(self): + inst, sock, map = self._makeOneWithMap() + inst.adj.outbuf_high_watermark = 3 + inst.total_outbufs_len = 4 + inst.current_outbuf_count = 4 class Lock(DummyLock): def wait(self): - inst.outbufs[0].prune() - inst.total_outbufs_len = sum(len(x) for x in inst.outbufs) + inst.total_outbufs_len = 0 super(Lock, self).wait() inst.outbuf_lock = Lock() wrote = inst.write_soon(b'xyz') self.assertEqual(wrote, 3) self.assertEqual(len(inst.outbufs), 2) - self.assertTrue(inst.outbufs[0].pruned) + self.assertEqual(inst.outbufs[0].get(), b'') self.assertEqual(inst.outbufs[1].get(), b'xyz') self.assertTrue(inst.outbuf_lock.waited) @@ -753,10 +762,6 @@ def get(self, *arg): def skip(self, num, x): self.skipped = num - def prune(self): - self.pruned = True - self.data = b'' - def __len__(self): return len(self.data) From 4ee5af662d7cfc8ff9c212a397f5749dc7f9a879 Mon Sep 17 00:00:00 2001 From: Michael Merickel Date: Fri, 5 Apr 2019 13:42:39 -0500 Subject: [PATCH 5/6] adjust changelog --- CHANGES.txt | 13 ++++++------- 1 file changed, 6 insertions(+), 7 deletions(-) diff --git a/CHANGES.txt b/CHANGES.txt index d5c6274f..2897939a 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -17,9 +17,6 @@ Features with a lot of data. See https://github.com/Pylons/waitress/pull/242 -Bugfixes -~~~~~~~~ - - Stop early and close the ``app_iter`` when attempting to write to a closed socket due to a client disconnect. This should notify a long-lived streaming response when a client hangs up. @@ -35,6 +32,9 @@ Bugfixes still be flushed efficiently. See https://github.com/Pylons/waitress/pull/246 +Bugfixes +~~~~~~~~ + - When a client closes a socket unexpectedly there was potential for memory leaks in which data was written to the buffers after they were closed, causing them to reopen. @@ -49,10 +49,9 @@ Bugfixes the server will die before benefiting from these changes. See https://github.com/Pylons/waitress/pull/245 -- Fix a bug in which an ``app_iter`` that emits data chunks quicker than can - be written to a socket may never cleanup data that has already been sent. - This would cause buffers in waitress to grow without bounds. These buffers - now properly reclaim their space as data is written to the socket. +- Fix a bug in which a streaming ``app_iter`` may never cleanup data that has + already been sent. This would cause buffers in waitress to grow without + bounds. These buffers now properly rotate and release their data. See https://github.com/Pylons/waitress/pull/242 1.2.1 (2019-01-25) From 9e1b10aa3d8787868ac8d7556b27d36a22357660 Mon Sep 17 00:00:00 2001 From: Michael Merickel Date: Sat, 6 Apr 2019 16:36:03 -0500 Subject: [PATCH 6/6] fix comment --- waitress/channel.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/waitress/channel.py b/waitress/channel.py index 6f207428..12e6fce8 100644 --- a/waitress/channel.py +++ b/waitress/channel.py @@ -313,9 +313,9 @@ def write_soon(self, data): self.outbufs.append(nextbuf) self.current_outbuf_count = 0 else: - # if we overflowed then start a new buffer to ensure avoid - # this buffer growing unbounded if self.current_outbuf_count > self.adj.outbuf_high_watermark: + # rotate to a new buffer if the current buffer has hit + # the watermark to avoid it growing unbounded nextbuf = OverflowableBuffer(self.adj.outbuf_overflow) self.outbufs.append(nextbuf) self.current_outbuf_count = 0