Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix a deadlock when the client disconnects while flushing data #241

Merged
merged 4 commits into from Apr 2, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGES.txt
Expand Up @@ -9,6 +9,7 @@ Bugfixes
response when a client hangs up.
See https://github.com/Pylons/waitress/pull/238
and https://github.com/Pylons/waitress/pull/240
and https://github.com/Pylons/waitress/pull/241

- When a client closes a socket unexpectedly there was potential for memory
leaks in which data was written to the buffers after they were closed,
Expand Down
29 changes: 13 additions & 16 deletions waitress/channel.py
Expand Up @@ -70,29 +70,21 @@ def __init__(
self.server = server
self.adj = adj
self.outbufs = [OverflowableBuffer(adj.outbuf_overflow)]
self.total_outbufs_len = 0
self.creation_time = self.last_activity = time.time()

# task_lock used to push/pop requests
self.task_lock = threading.Lock()
# outbuf_lock used to access any outbuf
self.outbuf_lock = threading.Lock()
self.outbuf_lock = threading.RLock()

wasyncore.dispatcher.__init__(self, sock, map=map)

# Don't let wasyncore.dispatcher throttle self.addr on us.
self.addr = addr

def any_outbuf_has_data(self):
for outbuf in self.outbufs:
if bool(outbuf):
return True
return False

def total_outbufs_len(self):
# genexpr == more funccalls
# use b.__len__ rather than len(b) FBO of not getting OverflowError
# on Python 2
return sum([b.__len__() for b in self.outbufs])
return self.total_outbufs_len > 0

def writable(self):
# if there's data in the out buffer or we've been instructed to close
Expand Down Expand Up @@ -124,7 +116,7 @@ def handle_write(self):
# won't get done.
flush = self._flush_some_if_lockable
self.force_flush = False
elif (self.total_outbufs_len() >= self.adj.send_bytes):
elif (self.total_outbufs_len >= self.adj.send_bytes):
# 1. There's a running task, so we need to try to lock
# the outbuf before sending
# 2. Only try to send if the data in the out buffer is larger
Expand Down Expand Up @@ -196,7 +188,9 @@ def received(self, data):
if not self.sent_continue:
# there's no current task, so we don't need to try to
# lock the outbuf to append to it.
self.outbufs[-1].append(b'HTTP/1.1 100 Continue\r\n\r\n')
outbuf_payload = b'HTTP/1.1 100 Continue\r\n\r\n'
self.outbufs[-1].append(outbuf_payload)
self.total_outbufs_len += len(outbuf_payload)
self.sent_continue = True
self._flush_some()
request.completed = False
Expand Down Expand Up @@ -261,6 +255,7 @@ def _flush_some(self):
outbuf.skip(num_sent, True)
outbuflen -= num_sent
sent += num_sent
self.total_outbufs_len -= num_sent
else:
dobreak = True
break
Expand All @@ -275,15 +270,15 @@ def _flush_some(self):
return False

def handle_close(self):
# avoid closing the outbufs while a task is potentially adding data
# to them in write_soon
# NB: default to True for when asyncore calls this function directly
with self.outbuf_lock:
for outbuf in self.outbufs:
try:
outbuf.close()
except:
self.logger.exception(
'Unknown exception while trying to close outbuf')
self.total_outbufs_len = 0
self.connected = False
wasyncore.dispatcher.close(self)

Expand Down Expand Up @@ -330,11 +325,13 @@ def write_soon(self, data):
self.outbufs.append(nextbuf)
else:
self.outbufs[-1].append(data)
num_bytes = len(data)
self.total_outbufs_len += num_bytes
# XXX We might eventually need to pull the trigger here (to
# instruct select to stop blocking), but it slows things down so
# much that I'll hold off for now; "server push" on otherwise
# unbusy systems may suffer.
return len(data)
return num_bytes
return 0

def service(self):
Expand Down
18 changes: 12 additions & 6 deletions waitress/tests/test_channel.py
Expand Up @@ -25,18 +25,23 @@ def test_ctor(self):
def test_total_outbufs_len_an_outbuf_size_gt_sys_maxint(self):
from waitress.compat import MAXINT
inst, _, map = self._makeOneWithMap()
class DummyHugeBuffer(object):
class DummyBuffer(object):
chunks = []
def append(self, data):
self.chunks.append(data)
class DummyData(object):
def __len__(self):
return MAXINT + 1
inst.outbufs = [DummyHugeBuffer()]
result = inst.total_outbufs_len()
return MAXINT
inst.total_outbufs_len = 1
inst.outbufs = [DummyBuffer()]
inst.write_soon(DummyData())
# we are testing that this method does not raise an OverflowError
# (see https://github.com/Pylons/waitress/issues/47)
self.assertEqual(result, MAXINT+1)
self.assertEqual(inst.total_outbufs_len, MAXINT+1)

def test_writable_something_in_outbuf(self):
inst, sock, map = self._makeOneWithMap()
inst.outbufs[0].append(b'abc')
inst.total_outbufs_len = 3
self.assertTrue(inst.writable())

def test_writable_nothing_in_outbuf(self):
Expand Down Expand Up @@ -132,6 +137,7 @@ def test_handle_write_no_requests_outbuf_gt_send_bytes(self):
inst, sock, map = self._makeOneWithMap()
inst.requests = [True]
inst.outbufs = [DummyBuffer(b'abc')]
inst.total_outbufs_len = 3
inst.adj.send_bytes = 2
inst.will_close = False
inst.last_activity = 0
Expand Down