Skip to content
Browse files

Further refactoring of duplicated IOStream logic

  • Loading branch information...
1 parent 01dd70c commit 2db0aceb32f5c042f5306e72a4679580b4359f34 @bdarnell bdarnell committed Feb 19, 2012
Showing with 18 additions and 12 deletions.
  1. +18 −12 tornado/iostream.py
View
30 tornado/iostream.py
@@ -137,15 +137,15 @@ def connect(self, address, callback=None):
def read_until_regex(self, regex, callback):
"""Call callback when we read the given regex pattern."""
- assert not self._read_callback, "Already reading"
+ self._set_read_callback(callback)
self._read_regex = re.compile(regex)
- self._read_until(callback)
+ self._try_inline_read()
def read_until(self, delimiter, callback):
"""Call callback when we read the given delimiter."""
- assert not self._read_callback, "Already reading"
+ self._set_read_callback(callback)
self._read_delimiter = delimiter
- self._read_until(callback)
+ self._try_inline_read()
def read_bytes(self, num_bytes, callback, streaming_callback=None):
"""Call callback when we read the given number of bytes.
@@ -154,11 +154,11 @@ def read_bytes(self, num_bytes, callback, streaming_callback=None):
of data as they become available, and the argument to the final
``callback`` will be empty.
"""
- assert not self._read_callback, "Already reading"
+ self._set_read_callback(callback)
assert isinstance(num_bytes, (int, long))
self._read_bytes = num_bytes
self._streaming_callback = stack_context.wrap(streaming_callback)
- self._read_until(callback)
+ self._try_inline_read()
def read_until_close(self, callback, streaming_callback=None):
"""Reads all data from the socket until it is closed.
@@ -170,12 +170,12 @@ def read_until_close(self, callback, streaming_callback=None):
Subject to ``max_buffer_size`` limit from `IOStream` constructor if
a ``streaming_callback`` is not used.
"""
- assert not self._read_callback, "Already reading"
+ self._set_read_callback(callback)
if self.closed():
self._run_callback(callback, self._consume(self._read_buffer_size))
+ self._read_callback = None
return
self._read_until_close = True
- self._read_callback = stack_context.wrap(callback)
self._streaming_callback = stack_context.wrap(streaming_callback)
self._add_io_state(self.io_loop.READ)
@@ -327,11 +327,17 @@ def _handle_read(self):
if self._read_from_buffer():
return
- def _read_until(self, callback):
- """Assign given read callback and initiate read to buffer
- unless stream has already been read or closed.
+ def _set_read_callback(self, callback):
+ assert not self._read_callback, "Already reading"
+ self._read_callback = callback
+
+ def _try_inline_read(self):
+ """Attempt to complete the current read operation from buffered data.
+
+ If the read can be completed without blocking, schedules the
+ read callback on the next IOLoop iteration; otherwise starts
+ listening for reads on the socket.
"""
- self._read_callback = stack_context.wrap(callback)
while True:
# See if we've already got the data from a previous read
if self._read_from_buffer():

0 comments on commit 2db0ace

Please sign in to comment.
Something went wrong with that request. Please try again.