Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix bug that caused slow requests to block the processing of other concurrent requests #1

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
6 changes: 4 additions & 2 deletions waitress/channel.py
Expand Up @@ -52,8 +52,6 @@ class HTTPChannel(logging_dispatcher, object):
close_when_flushed = False # set to True to close the socket when flushed
requests = () # currently pending requests
sent_continue = False # used as a latch after sending 100 continue
task_lock = thread.allocate_lock() # lock used to push/pop requests
outbuf_lock = thread.allocate_lock() # lock used to access any outbuf
force_flush = False # indicates a need to flush the outbuf

#
Expand All @@ -74,6 +72,10 @@ def __init__(
self.outbufs = [OverflowableBuffer(adj.outbuf_overflow)]
self.creation_time = self.last_activity = time.time()
asyncore.dispatcher.__init__(self, sock, map=map)
# lock used to push/pop requests
self.task_lock = thread.allocate_lock()
# lock used to access any outbuf
self.outbuf_lock = thread.allocate_lock()

def any_outbuf_has_data(self):
for outbuf in self.outbufs:
Expand Down
24 changes: 24 additions & 0 deletions waitress/tests/fixtureapps/slow.py
@@ -0,0 +1,24 @@
import time

def app(environ, start_response):
path_info = environ['PATH_INFO']
if path_info == '/slow':
time.sleep(1)
body = b'slow'
else:
body = b'quick'
cl = str(len(body))
start_response(
'200 OK',
[('Content-Length', cl), ('Content-Type', 'text/plain')])
return [body]

if __name__ == '__main__':
import logging
class NullHandler(logging.Handler):
def emit(self, record):
pass
h = NullHandler()
logging.getLogger('waitress').addHandler(h)
from waitress import serve
serve(app, port=61523, _quiet=True, expose_tracebacks=True)
38 changes: 38 additions & 0 deletions waitress/tests/test_functional.py
Expand Up @@ -1153,6 +1153,44 @@ def test_notfilelike_nocl_http10(self):
self.sock.send(to_send)
self.assertRaises(ConnectionClosed, read_http, fp)


class TestConcurrentRequests(SubprocessTests, unittest.TestCase):

def setUp(self):
slow = os.path.join(here, 'fixtureapps', 'slow.py')
self.start_subprocess([self.exe, slow])
self.bodies = []
self.sent = []

def tearDown(self):
self.stop_subprocess()

def _make_socket(self):
return socket.socket(socket.AF_INET, socket.SOCK_STREAM)

def _send_request(self, path):
to_send = "GET %s HTTP/1.0\n\n" % path
to_send = tobytes(to_send)
socket = self._make_socket()
socket.connect((self.host, self.port))
socket.send(to_send)
self.sent.append(path)
fp = socket.makefile('rb', 0)
line, headers, response_body = read_http(fp)
self.bodies.append(response_body)

def test_slow_requests_do_not_block_other_threads(self):
import thread
thread.start_new_thread(self._send_request, ('/slow', ))
# wait a bit to make sure that '/slow' is requested before '/quick'
time.sleep(0.3)
thread.start_new_thread(self._send_request, ('/quick', ))
while len(self.bodies) < 2:
time.sleep(.1) # wait for requests to be sent and processed
self.assertEqual(self.sent, ['/slow', '/quick'])
self.assertEqual(self.bodies, ['quick', 'slow'])


def parse_headers(fp):
"""Parses only RFC2822 headers from a file pointer.
"""
Expand Down