Skip to content
This repository has been archived by the owner on Jan 13, 2021. It is now read-only.

Commit

Permalink
Merge pull request #89 from Lukasa/readline
Browse files Browse the repository at this point in the history
Initial buffered socket readline proposal.
  • Loading branch information
Lukasa committed Feb 27, 2015
2 parents 9103cb2 + d018249 commit 4bb3a89
Show file tree
Hide file tree
Showing 3 changed files with 188 additions and 13 deletions.
99 changes: 86 additions & 13 deletions hyper/http20/bufsocket.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
process.
"""
import select
from .exceptions import ConnectionResetError
from .exceptions import ConnectionResetError, LineTooLongError

class BufferedSocket(object):
"""
Expand Down Expand Up @@ -76,6 +76,25 @@ def can_read(self):

return False

def new_buffer(self):
"""
This method moves all the data in the backing buffer to the start of
a new, fresh buffer. This gives the ability to read much more data.
"""
def read_all_from_buffer():
end = self._index + self._bytes_in_buffer
return self._buffer_view[self._index:end]

new_buffer = bytearray(self._buffer_size)
new_buffer_view = memoryview(new_buffer)
new_buffer_view[0:self._bytes_in_buffer] = read_all_from_buffer()

self._index = 0
self._backing_buffer = new_buffer
self._buffer_view = new_buffer_view

return

def recv(self, amt):
"""
Read some data from the socket.
Expand All @@ -85,26 +104,16 @@ def recv(self, amt):
bytes. The data *must* be copied out by the caller before the next
call to this function.
"""
def read_all_from_buffer():
end = self._index + self._bytes_in_buffer
return self._buffer_view[self._index:end]

# In this implementation you can never read more than the number of
# bytes in the buffer.
if amt > self._buffer_size:
amt = self._buffer_size

# If the amount of data we've been asked to read is less than the
# remaining space in the buffer, we need to clear out the buffer and
# start over. Copy the data into the new array.
# start over.
if amt > self._remaining_capacity:
new_buffer = bytearray(self._buffer_size)
new_buffer_view = memoryview(new_buffer)
new_buffer_view[0:self._bytes_in_buffer] = read_all_from_buffer()

self._index = 0
self._backing_buffer = new_buffer
self._buffer_view = new_buffer_view
self.new_buffer()

# If there's still some room in the buffer, opportunistically attempt
# to read into it.
Expand Down Expand Up @@ -136,5 +145,69 @@ def read_all_from_buffer():

return data

def readline(self):
"""
Read up to a newline from the network and returns it. The implicit
maximum line length is the buffer size of the buffered socket.
Note that, unlike recv, this method absolutely *does* block until it
can read the line.
:returns: A ``memoryview`` object containing the appropriate number of
bytes. The data *must* be copied out by the caller before the next
call to this function.
"""
# First, check if there's anything in the buffer. This is one of those
# rare circumstances where this will work correctly on all platforms.
index = self._backing_buffer.find(
b'\n',
self._index,
self._index + self._bytes_in_buffer
)

if index != -1:
length = index + 1 - self._index
data = self._buffer_view[self._index:self._index+length]
self._index += length
self._bytes_in_buffer -= length
return data

# In this case, we didn't find a newline in the buffer. To fix that,
# read some data into the buffer. To do our best to satisfy the read,
# we should shunt the data down in the buffer so that it's right at
# the start. We don't bother if we're already at the start of the
# buffer.
if self._index != 0:
self.new_buffer()

while self._bytes_in_buffer < self._buffer_size:
count = self._sck.recv_into(self._buffer_view[self._buffer_end:])
if not count:
raise ConnectionResetError()

# We have some more data. Again, look for a newline in that gap.
first_new_byte = self._buffer_end
self._bytes_in_buffer += count
index = self._backing_buffer.find(
b'\n',
first_new_byte,
first_new_byte + count,
)

if index != -1:
# The length of the buffer is the index into the
# buffer at which we found the newline plus 1, minus the start
# index of the buffer, which really should be zero.
assert not self._index
length = index + 1
data = self._buffer_view[:length]
self._index += length
self._bytes_in_buffer -= length
return data

# If we got here, it means we filled the buffer without ever getting
# a newline. Time to throw an exception.
raise LineTooLongError()

def __getattr__(self, name):
return getattr(self._sck, name)
15 changes: 15 additions & 0 deletions hyper/http20/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,21 @@
This defines exceptions used in the HTTP/2 portion of hyper.
"""
class SocketError(Exception):
"""
An error occurred during socket operation.
"""
pass


class LineTooLongError(Exception):
"""
An attempt to read a line from a socket failed because no newline was
found.
"""
pass


class HTTP20Error(Exception):
"""
The base class for all of ``hyper``'s HTTP/2-related exceptions.
Expand Down
87 changes: 87 additions & 0 deletions test/test_socket.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,11 @@
Test the BufferedSocket implementation in hyper.
"""
import pytest

import hyper.http20.bufsocket
from hyper.http20.bufsocket import BufferedSocket
from hyper.http20.exceptions import ConnectionResetError, LineTooLongError

# Patch the select method in bufsocket to make sure that it always returns
# the dummy socket as readable.
Expand Down Expand Up @@ -120,6 +123,90 @@ def test_oversized_read(self, monkeypatch):
d = b.recv(1200).tobytes()
assert d == b'a' * 600

def test_readline_from_buffer(self, monkeypatch):
monkeypatch.setattr(
hyper.http20.bufsocket.select, 'select', dummy_select
)
s = DummySocket()
b = BufferedSocket(s)

one = b'hi there\r\n'
two = b'this is another line\r\n'
three = b'\r\n'
combined = b''.join([one, two, three])
b._buffer_view[0:len(combined)] = combined
b._bytes_in_buffer += len(combined)

assert b.readline().tobytes() == one
assert b.readline().tobytes() == two
assert b.readline().tobytes() == three

def test_readline_from_socket(self, monkeypatch):
monkeypatch.setattr(
hyper.http20.bufsocket.select, 'select', dummy_select
)
s = DummySocket()
b = BufferedSocket(s)

one = b'hi there\r\n'
two = b'this is another line\r\n'
three = b'\r\n'
combined = b''.join([one, two, three])

for i in range(0, len(combined), 5):
s.inbound_packets.append(combined[i:i+5])

assert b.readline().tobytes() == one
assert b.readline().tobytes() == two
assert b.readline().tobytes() == three

def test_readline_both(self, monkeypatch):
monkeypatch.setattr(
hyper.http20.bufsocket.select, 'select', dummy_select
)
s = DummySocket()
b = BufferedSocket(s)

one = b'hi there\r\n'
two = b'this is another line\r\n'
three = b'\r\n'
combined = b''.join([one, two, three])

split_index = int(len(combined) / 2)

b._buffer_view[0:split_index] = combined[0:split_index]
b._bytes_in_buffer += split_index

for i in range(split_index, len(combined), 5):
s.inbound_packets.append(combined[i:i+5])

assert b.readline().tobytes() == one
assert b.readline().tobytes() == two
assert b.readline().tobytes() == three

def test_socket_error_on_readline(self, monkeypatch):
monkeypatch.setattr(
hyper.http20.bufsocket.select, 'select', dummy_select
)
s = DummySocket()
b = BufferedSocket(s)

with pytest.raises(ConnectionResetError):
b.readline()

def test_socket_readline_too_long(self, monkeypatch):
monkeypatch.setattr(
hyper.http20.bufsocket.select, 'select', dummy_select
)
s = DummySocket()
b = BufferedSocket(s)

b._buffer_view[0:b._buffer_size] = b'0' * b._buffer_size
b._bytes_in_buffer = b._buffer_size

with pytest.raises(LineTooLongError):
b.readline()


class DummySocket(object):
def __init__(self):
Expand Down

0 comments on commit 4bb3a89

Please sign in to comment.