From fab8f1a75145a9397cc013fe830c12e65a62ece7 Mon Sep 17 00:00:00 2001 From: Michael Merickel Date: Tue, 2 Apr 2019 00:25:28 -0500 Subject: [PATCH 1/4] fix a deadlock when the client disconnects while flushing data --- waitress/channel.py | 16 ++++++++++------ 1 file changed, 10 insertions(+), 6 deletions(-) diff --git a/waitress/channel.py b/waitress/channel.py index a377a54b..e717dd77 100644 --- a/waitress/channel.py +++ b/waitress/channel.py @@ -151,7 +151,7 @@ def handle_write(self): self.will_close = True if self.will_close: - self.handle_close() + self.handle_close(lock=False) def readable(self): # We might want to create a new task. We can only do this if: @@ -168,7 +168,7 @@ def handle_read(self): except socket.error: if self.adj.log_socket_errors: self.logger.exception('Socket error') - self.handle_close() + self.handle_close(lock=True) return if data: self.last_activity = time.time() @@ -274,10 +274,11 @@ def _flush_some(self): return False - def handle_close(self): - # avoid closing the outbufs while a task is potentially adding data - # to them in write_soon - with self.outbuf_lock: + def handle_close(self, lock=True): + # NB: default to True for when asyncore calls this function directly + if lock: + self.outbuf_lock.acquire() + try: for outbuf in self.outbufs: try: outbuf.close() @@ -285,6 +286,9 @@ def handle_close(self): self.logger.exception( 'Unknown exception while trying to close outbuf') self.connected = False + finally: + if lock: + self.outbuf_lock.release() wasyncore.dispatcher.close(self) def add_channel(self, map=None): From 5bf34d52a27e3771718c67e52985d0f5bd6a838f Mon Sep 17 00:00:00 2001 From: Michael Merickel Date: Tue, 2 Apr 2019 00:26:47 -0500 Subject: [PATCH 2/4] add changelog for #241 --- CHANGES.txt | 1 + 1 file changed, 1 insertion(+) diff --git a/CHANGES.txt b/CHANGES.txt index ef08b181..4c583e20 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -9,6 +9,7 @@ Bugfixes response when a client hangs up. See https://github.com/Pylons/waitress/pull/238 and https://github.com/Pylons/waitress/pull/240 + and https://github.com/Pylons/waitress/pull/241 - When a client closes a socket unexpectedly there was potential for memory leaks in which data was written to the buffers after they were closed, From cacdafdc3451b337f7091e042444b90f056889db Mon Sep 17 00:00:00 2001 From: Michael Merickel Date: Tue, 2 Apr 2019 01:02:22 -0500 Subject: [PATCH 3/4] switch to an RLock as handle_close is called sometimes during _flush_some --- waitress/channel.py | 15 +++++---------- 1 file changed, 5 insertions(+), 10 deletions(-) diff --git a/waitress/channel.py b/waitress/channel.py index e717dd77..cb829f07 100644 --- a/waitress/channel.py +++ b/waitress/channel.py @@ -75,7 +75,7 @@ def __init__( # task_lock used to push/pop requests self.task_lock = threading.Lock() # outbuf_lock used to access any outbuf - self.outbuf_lock = threading.Lock() + self.outbuf_lock = threading.RLock() wasyncore.dispatcher.__init__(self, sock, map=map) @@ -151,7 +151,7 @@ def handle_write(self): self.will_close = True if self.will_close: - self.handle_close(lock=False) + self.handle_close() def readable(self): # We might want to create a new task. We can only do this if: @@ -168,7 +168,7 @@ def handle_read(self): except socket.error: if self.adj.log_socket_errors: self.logger.exception('Socket error') - self.handle_close(lock=True) + self.handle_close() return if data: self.last_activity = time.time() @@ -274,11 +274,9 @@ def _flush_some(self): return False - def handle_close(self, lock=True): + def handle_close(self): # NB: default to True for when asyncore calls this function directly - if lock: - self.outbuf_lock.acquire() - try: + with self.outbuf_lock: for outbuf in self.outbufs: try: outbuf.close() @@ -286,9 +284,6 @@ def handle_close(self, lock=True): self.logger.exception( 'Unknown exception while trying to close outbuf') self.connected = False - finally: - if lock: - self.outbuf_lock.release() wasyncore.dispatcher.close(self) def add_channel(self, map=None): From 042c52f8768a10ef377ecb6c04a1d6b3cce1ade6 Mon Sep 17 00:00:00 2001 From: Michael Merickel Date: Tue, 2 Apr 2019 01:09:37 -0500 Subject: [PATCH 4/4] optimize tracking of pending outbuf bytes --- waitress/channel.py | 24 +++++++++++------------- waitress/tests/test_channel.py | 18 ++++++++++++------ 2 files changed, 23 insertions(+), 19 deletions(-) diff --git a/waitress/channel.py b/waitress/channel.py index cb829f07..fd5a3f5c 100644 --- a/waitress/channel.py +++ b/waitress/channel.py @@ -70,6 +70,7 @@ def __init__( self.server = server self.adj = adj self.outbufs = [OverflowableBuffer(adj.outbuf_overflow)] + self.total_outbufs_len = 0 self.creation_time = self.last_activity = time.time() # task_lock used to push/pop requests @@ -83,16 +84,7 @@ def __init__( self.addr = addr def any_outbuf_has_data(self): - for outbuf in self.outbufs: - if bool(outbuf): - return True - return False - - def total_outbufs_len(self): - # genexpr == more funccalls - # use b.__len__ rather than len(b) FBO of not getting OverflowError - # on Python 2 - return sum([b.__len__() for b in self.outbufs]) + return self.total_outbufs_len > 0 def writable(self): # if there's data in the out buffer or we've been instructed to close @@ -124,7 +116,7 @@ def handle_write(self): # won't get done. flush = self._flush_some_if_lockable self.force_flush = False - elif (self.total_outbufs_len() >= self.adj.send_bytes): + elif (self.total_outbufs_len >= self.adj.send_bytes): # 1. There's a running task, so we need to try to lock # the outbuf before sending # 2. Only try to send if the data in the out buffer is larger @@ -196,7 +188,9 @@ def received(self, data): if not self.sent_continue: # there's no current task, so we don't need to try to # lock the outbuf to append to it. - self.outbufs[-1].append(b'HTTP/1.1 100 Continue\r\n\r\n') + outbuf_payload = b'HTTP/1.1 100 Continue\r\n\r\n' + self.outbufs[-1].append(outbuf_payload) + self.total_outbufs_len += len(outbuf_payload) self.sent_continue = True self._flush_some() request.completed = False @@ -261,6 +255,7 @@ def _flush_some(self): outbuf.skip(num_sent, True) outbuflen -= num_sent sent += num_sent + self.total_outbufs_len -= num_sent else: dobreak = True break @@ -283,6 +278,7 @@ def handle_close(self): except: self.logger.exception( 'Unknown exception while trying to close outbuf') + self.total_outbufs_len = 0 self.connected = False wasyncore.dispatcher.close(self) @@ -329,11 +325,13 @@ def write_soon(self, data): self.outbufs.append(nextbuf) else: self.outbufs[-1].append(data) + num_bytes = len(data) + self.total_outbufs_len += num_bytes # XXX We might eventually need to pull the trigger here (to # instruct select to stop blocking), but it slows things down so # much that I'll hold off for now; "server push" on otherwise # unbusy systems may suffer. - return len(data) + return num_bytes return 0 def service(self): diff --git a/waitress/tests/test_channel.py b/waitress/tests/test_channel.py index 7efd3b39..df20dab6 100644 --- a/waitress/tests/test_channel.py +++ b/waitress/tests/test_channel.py @@ -25,18 +25,23 @@ def test_ctor(self): def test_total_outbufs_len_an_outbuf_size_gt_sys_maxint(self): from waitress.compat import MAXINT inst, _, map = self._makeOneWithMap() - class DummyHugeBuffer(object): + class DummyBuffer(object): + chunks = [] + def append(self, data): + self.chunks.append(data) + class DummyData(object): def __len__(self): - return MAXINT + 1 - inst.outbufs = [DummyHugeBuffer()] - result = inst.total_outbufs_len() + return MAXINT + inst.total_outbufs_len = 1 + inst.outbufs = [DummyBuffer()] + inst.write_soon(DummyData()) # we are testing that this method does not raise an OverflowError # (see https://github.com/Pylons/waitress/issues/47) - self.assertEqual(result, MAXINT+1) + self.assertEqual(inst.total_outbufs_len, MAXINT+1) def test_writable_something_in_outbuf(self): inst, sock, map = self._makeOneWithMap() - inst.outbufs[0].append(b'abc') + inst.total_outbufs_len = 3 self.assertTrue(inst.writable()) def test_writable_nothing_in_outbuf(self): @@ -132,6 +137,7 @@ def test_handle_write_no_requests_outbuf_gt_send_bytes(self): inst, sock, map = self._makeOneWithMap() inst.requests = [True] inst.outbufs = [DummyBuffer(b'abc')] + inst.total_outbufs_len = 3 inst.adj.send_bytes = 2 inst.will_close = False inst.last_activity = 0