Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add pipeline response parsing tests #137

Merged
merged 2 commits into from
Oct 16, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
47 changes: 33 additions & 14 deletions benchmark.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,27 +56,46 @@ async def send(writer: asyncio.StreamWriter) -> None:
writer.write(proxy.build_http_request(
proxy.httpMethods.GET, b'/'
))
# await asyncio.sleep(0.1)
await asyncio.sleep(0.01)
except KeyboardInterrupt:
pass

@staticmethod
def parse_pipeline_response(response: proxy.HttpParser, raw: bytes, counter: int = 0) -> \
Tuple[proxy.HttpParser, int]:
response.parse(raw)
if response.state != proxy.httpParserStates.COMPLETE:
# Need more data
return response, counter

if response.buffer == b'':
# No more buffer left to parse
return response, counter + 1

# For pipelined requests we may have pending buffer, try parse them as responses
pipelined_response = proxy.HttpParser(proxy.httpParserTypes.RESPONSE_PARSER)
return Benchmark.parse_pipeline_response(pipelined_response, response.buffer, counter + 1)

@staticmethod
async def recv(idd: int, reader: asyncio.StreamReader) -> None:
last_status_time = time.time()
num_completed_requests_per_connection: int = 0
print_every = 1000
last_print = time.time()
num_completed_requests: int = 0
response = proxy.HttpParser(proxy.httpParserTypes.RESPONSE_PARSER)
try:
while True:
response = proxy.HttpParser(proxy.httpParserTypes.RESPONSE_PARSER)
while response.state != proxy.httpParserStates.COMPLETE:
raw = await reader.read(proxy.DEFAULT_BUFFER_SIZE)
print(raw)
response.parse(raw)

num_completed_requests_per_connection += 1
if num_completed_requests_per_connection % 50 == 0:
now = time.time()
print('[%d] Made 50 requests in last %.2f seconds' % (idd, now - last_status_time))
last_status_time = now
raw = await reader.read(proxy.DEFAULT_BUFFER_SIZE)
response, total_parsed = Benchmark.parse_pipeline_response(response, raw)
if response.state == proxy.httpParserStates.COMPLETE:
response = proxy.HttpParser(proxy.httpParserTypes.RESPONSE_PARSER)
if total_parsed > 0:
num_completed_requests += total_parsed
# print('total parsed %d' % total_parsed)
if num_completed_requests % print_every == 0:
now = time.time()
print('[%d] Completed last %d requests in %.2f secs' %
(idd, print_every, now - last_print))
last_print = now
except KeyboardInterrupt:
pass

Expand Down
34 changes: 23 additions & 11 deletions proxy.py
Original file line number Diff line number Diff line change
Expand Up @@ -257,8 +257,16 @@ def build_http_response(status_code: int,
line.append(reason)
if headers is None:
headers = {}
if body is not None and not any(
k.lower() == b'content-length' for k in headers):
has_content_length = False
has_transfer_encoding = False
for k in headers:
if k.lower() == b'content-length':
has_content_length = True
if k.lower() == b'transfer-encoding':
has_transfer_encoding = True
if body is not None and \
not has_transfer_encoding and \
not has_content_length:
headers[b'Content-Length'] = bytes_(len(body))
return build_http_pkt(line, headers, body)

Expand Down Expand Up @@ -501,10 +509,11 @@ def __init__(self) -> None:
# Expected size of next following chunk
self.size: Optional[int] = None

def parse(self, raw: bytes) -> None:
def parse(self, raw: bytes) -> bytes:
more = True if len(raw) > 0 else False
while more:
while more and self.state != chunkParserStates.COMPLETE:
more, raw = self.process(raw)
return raw

def process(self, raw: bytes) -> Tuple[bool, bytes]:
if self.state == chunkParserStates.WAITING_FOR_SIZE:
Expand Down Expand Up @@ -651,27 +660,29 @@ def parse(self, raw: bytes) -> None:
self.buffer = b''

more = True if len(raw) > 0 else False
while more:
while more and self.state != httpParserStates.COMPLETE:
if self.state in (
httpParserStates.HEADERS_COMPLETE,
httpParserStates.RCVING_BODY,
httpParserStates.COMPLETE):
httpParserStates.RCVING_BODY):
if b'content-length' in self.headers:
self.state = httpParserStates.RCVING_BODY
if self.body is None:
self.body = b''
self.body += raw
total_size = int(self.header(b'content-length'))
received_size = len(self.body)
self.body += raw[:total_size - received_size]
if self.body and \
len(self.body) >= int(self.header(b'content-length')):
len(self.body) == int(self.header(b'content-length')):
self.state = httpParserStates.COMPLETE
more, raw = len(raw) > 0, raw[total_size - received_size:]
elif self.is_chunked_encoded():
if not self.chunk_parser:
self.chunk_parser = ChunkParser()
self.chunk_parser.parse(raw)
raw = self.chunk_parser.parse(raw)
if self.chunk_parser.state == chunkParserStates.COMPLETE:
self.body = self.chunk_parser.body
self.state = httpParserStates.COMPLETE
more, raw = False, b''
more = False
else:
more, raw = self.process(raw)
self.buffer = raw
Expand Down Expand Up @@ -713,6 +724,7 @@ def process(self, raw: bytes) -> Tuple[bool, bytes]:
elif self.state == httpParserStates.HEADERS_COMPLETE and \
self.type == httpParserTypes.REQUEST_PARSER and \
self.method == httpMethods.POST and \
not self.is_chunked_encoded() and \
(b'content-length' not in self.headers or
(b'content-length' in self.headers and
int(self.headers[b'content-length'][1]) == 0)) and \
Expand Down
35 changes: 35 additions & 0 deletions tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -874,6 +874,41 @@ def test_chunked_response_parse(self) -> None:
self.assertEqual(self.parser.body, b'Wikipedia in\r\n\r\nchunks.')
self.assertEqual(self.parser.state, proxy.httpParserStates.COMPLETE)

def test_pipelined_response_parse(self) -> None:
response = proxy.build_http_response(
proxy.httpStatusCodes.OK, reason=b'OK',
headers={
b'Content-Length': b'15'
},
body=b'{"key":"value"}',
)
self.assert_pipeline_response(response)

def test_pipelined_chunked_response_parse(self) -> None:
response = proxy.build_http_response(
proxy.httpStatusCodes.OK, reason=b'OK',
headers={
b'Transfer-Encoding': b'chunked',
b'Content-Type': b'application/json',
},
body=b'f\r\n{"key":"value"}\r\n0\r\n\r\n'
)
self.assert_pipeline_response(response)

def assert_pipeline_response(self, response: bytes) -> None:
self.parser = proxy.HttpParser(proxy.httpParserTypes.RESPONSE_PARSER)
self.parser.parse(response + response)
self.assertEqual(self.parser.state, proxy.httpParserStates.COMPLETE)
self.assertEqual(self.parser.body, b'{"key":"value"}')
self.assertEqual(self.parser.buffer, response)

# parse buffer
parser = proxy.HttpParser(proxy.httpParserTypes.RESPONSE_PARSER)
parser.parse(self.parser.buffer)
self.assertEqual(parser.state, proxy.httpParserStates.COMPLETE)
self.assertEqual(parser.body, b'{"key":"value"}')
self.assertEqual(parser.buffer, b'')

def test_chunked_request_parse(self) -> None:
self.parser.parse(proxy.build_http_request(
proxy.httpMethods.POST, b'http://example.org/',
Expand Down