Skip to content

Commit

Permalink
Add pipeline response parsing tests (#137)
Browse files Browse the repository at this point in the history
* Add pipeline response parsing tests

* build_http_response now only adds content-length if transfer-encoding is
not provided.

Also return pending raw chunks from ChunkParser so that we can parse
pipelined chunk responses.
  • Loading branch information
abhinavsingh committed Oct 16, 2019
1 parent 69445a8 commit c77f8b5
Show file tree
Hide file tree
Showing 3 changed files with 91 additions and 25 deletions.
47 changes: 33 additions & 14 deletions benchmark.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,27 +56,46 @@ async def send(writer: asyncio.StreamWriter) -> None:
writer.write(proxy.build_http_request(
proxy.httpMethods.GET, b'/'
))
# await asyncio.sleep(0.1)
await asyncio.sleep(0.01)
except KeyboardInterrupt:
pass

@staticmethod
def parse_pipeline_response(response: proxy.HttpParser, raw: bytes, counter: int = 0) -> \
Tuple[proxy.HttpParser, int]:
response.parse(raw)
if response.state != proxy.httpParserStates.COMPLETE:
# Need more data
return response, counter

if response.buffer == b'':
# No more buffer left to parse
return response, counter + 1

# For pipelined requests we may have pending buffer, try parse them as responses
pipelined_response = proxy.HttpParser(proxy.httpParserTypes.RESPONSE_PARSER)
return Benchmark.parse_pipeline_response(pipelined_response, response.buffer, counter + 1)

@staticmethod
async def recv(idd: int, reader: asyncio.StreamReader) -> None:
last_status_time = time.time()
num_completed_requests_per_connection: int = 0
print_every = 1000
last_print = time.time()
num_completed_requests: int = 0
response = proxy.HttpParser(proxy.httpParserTypes.RESPONSE_PARSER)
try:
while True:
response = proxy.HttpParser(proxy.httpParserTypes.RESPONSE_PARSER)
while response.state != proxy.httpParserStates.COMPLETE:
raw = await reader.read(proxy.DEFAULT_BUFFER_SIZE)
print(raw)
response.parse(raw)

num_completed_requests_per_connection += 1
if num_completed_requests_per_connection % 50 == 0:
now = time.time()
print('[%d] Made 50 requests in last %.2f seconds' % (idd, now - last_status_time))
last_status_time = now
raw = await reader.read(proxy.DEFAULT_BUFFER_SIZE)
response, total_parsed = Benchmark.parse_pipeline_response(response, raw)
if response.state == proxy.httpParserStates.COMPLETE:
response = proxy.HttpParser(proxy.httpParserTypes.RESPONSE_PARSER)
if total_parsed > 0:
num_completed_requests += total_parsed
# print('total parsed %d' % total_parsed)
if num_completed_requests % print_every == 0:
now = time.time()
print('[%d] Completed last %d requests in %.2f secs' %
(idd, print_every, now - last_print))
last_print = now
except KeyboardInterrupt:
pass

Expand Down
34 changes: 23 additions & 11 deletions proxy.py
Original file line number Diff line number Diff line change
Expand Up @@ -257,8 +257,16 @@ def build_http_response(status_code: int,
line.append(reason)
if headers is None:
headers = {}
if body is not None and not any(
k.lower() == b'content-length' for k in headers):
has_content_length = False
has_transfer_encoding = False
for k in headers:
if k.lower() == b'content-length':
has_content_length = True
if k.lower() == b'transfer-encoding':
has_transfer_encoding = True
if body is not None and \
not has_transfer_encoding and \
not has_content_length:
headers[b'Content-Length'] = bytes_(len(body))
return build_http_pkt(line, headers, body)

Expand Down Expand Up @@ -501,10 +509,11 @@ def __init__(self) -> None:
# Expected size of next following chunk
self.size: Optional[int] = None

def parse(self, raw: bytes) -> None:
def parse(self, raw: bytes) -> bytes:
more = True if len(raw) > 0 else False
while more:
while more and self.state != chunkParserStates.COMPLETE:
more, raw = self.process(raw)
return raw

def process(self, raw: bytes) -> Tuple[bool, bytes]:
if self.state == chunkParserStates.WAITING_FOR_SIZE:
Expand Down Expand Up @@ -651,27 +660,29 @@ def parse(self, raw: bytes) -> None:
self.buffer = b''

more = True if len(raw) > 0 else False
while more:
while more and self.state != httpParserStates.COMPLETE:
if self.state in (
httpParserStates.HEADERS_COMPLETE,
httpParserStates.RCVING_BODY,
httpParserStates.COMPLETE):
httpParserStates.RCVING_BODY):
if b'content-length' in self.headers:
self.state = httpParserStates.RCVING_BODY
if self.body is None:
self.body = b''
self.body += raw
total_size = int(self.header(b'content-length'))
received_size = len(self.body)
self.body += raw[:total_size - received_size]
if self.body and \
len(self.body) >= int(self.header(b'content-length')):
len(self.body) == int(self.header(b'content-length')):
self.state = httpParserStates.COMPLETE
more, raw = len(raw) > 0, raw[total_size - received_size:]
elif self.is_chunked_encoded():
if not self.chunk_parser:
self.chunk_parser = ChunkParser()
self.chunk_parser.parse(raw)
raw = self.chunk_parser.parse(raw)
if self.chunk_parser.state == chunkParserStates.COMPLETE:
self.body = self.chunk_parser.body
self.state = httpParserStates.COMPLETE
more, raw = False, b''
more = False
else:
more, raw = self.process(raw)
self.buffer = raw
Expand Down Expand Up @@ -713,6 +724,7 @@ def process(self, raw: bytes) -> Tuple[bool, bytes]:
elif self.state == httpParserStates.HEADERS_COMPLETE and \
self.type == httpParserTypes.REQUEST_PARSER and \
self.method == httpMethods.POST and \
not self.is_chunked_encoded() and \
(b'content-length' not in self.headers or
(b'content-length' in self.headers and
int(self.headers[b'content-length'][1]) == 0)) and \
Expand Down
35 changes: 35 additions & 0 deletions tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -874,6 +874,41 @@ def test_chunked_response_parse(self) -> None:
self.assertEqual(self.parser.body, b'Wikipedia in\r\n\r\nchunks.')
self.assertEqual(self.parser.state, proxy.httpParserStates.COMPLETE)

def test_pipelined_response_parse(self) -> None:
response = proxy.build_http_response(
proxy.httpStatusCodes.OK, reason=b'OK',
headers={
b'Content-Length': b'15'
},
body=b'{"key":"value"}',
)
self.assert_pipeline_response(response)

def test_pipelined_chunked_response_parse(self) -> None:
response = proxy.build_http_response(
proxy.httpStatusCodes.OK, reason=b'OK',
headers={
b'Transfer-Encoding': b'chunked',
b'Content-Type': b'application/json',
},
body=b'f\r\n{"key":"value"}\r\n0\r\n\r\n'
)
self.assert_pipeline_response(response)

def assert_pipeline_response(self, response: bytes) -> None:
self.parser = proxy.HttpParser(proxy.httpParserTypes.RESPONSE_PARSER)
self.parser.parse(response + response)
self.assertEqual(self.parser.state, proxy.httpParserStates.COMPLETE)
self.assertEqual(self.parser.body, b'{"key":"value"}')
self.assertEqual(self.parser.buffer, response)

# parse buffer
parser = proxy.HttpParser(proxy.httpParserTypes.RESPONSE_PARSER)
parser.parse(self.parser.buffer)
self.assertEqual(parser.state, proxy.httpParserStates.COMPLETE)
self.assertEqual(parser.body, b'{"key":"value"}')
self.assertEqual(parser.buffer, b'')

def test_chunked_request_parse(self) -> None:
self.parser.parse(proxy.build_http_request(
proxy.httpMethods.POST, b'http://example.org/',
Expand Down

0 comments on commit c77f8b5

Please sign in to comment.