Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,6 @@ lib-profile:
--hostname 127.0.0.1 \
--num-acceptors 1 \
--num-workers 1 \
--disable-http-proxy \
--enable-web-server \
--plugin proxy.plugin.WebServerPlugin \
--local-executor \
Expand Down
13 changes: 7 additions & 6 deletions proxy/core/acceptor/threadless.py
Original file line number Diff line number Diff line change
Expand Up @@ -224,16 +224,13 @@ async def _selected_events(self) -> Tuple[
work_by_ids[key.data][1].append(key.fileobj)
return (work_by_ids, new_work_available)

async def _wait_for_tasks(self) -> None:
async def _wait_for_tasks(self) -> Set['asyncio.Task[bool]']:
finished, self.unfinished = await asyncio.wait(
self.unfinished,
timeout=self.wait_timeout,
return_when=asyncio.FIRST_COMPLETED,
)
for task in finished:
if task.result():
self._cleanup(task._work_id) # type: ignore
# self.cleanup(int(task.get_name()))
return finished # noqa: WPS331

def _fromfd(self, fileno: int) -> socket.socket:
return socket.fromfd(
Expand Down Expand Up @@ -299,7 +296,11 @@ async def _run_once(self) -> bool:
# Invoke Threadless.handle_events
self.unfinished.update(self._create_tasks(work_by_ids))
# logger.debug('Executing {0} works'.format(len(self.unfinished)))
await self._wait_for_tasks()
# Cleanup finished tasks
for task in await self._wait_for_tasks():
if task.result():
self._cleanup(task._work_id) # type: ignore
# self.cleanup(int(task.get_name()))
# logger.debug(
# 'Done executing works, {0} pending, {1} registered'.format(
# len(self.unfinished), len(self.registered_events_by_work_ids),
Expand Down
2 changes: 1 addition & 1 deletion proxy/core/connection/connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ def close(self) -> bool:
return self.closed

def has_buffer(self) -> bool:
return self._num_buffer > 0
return self._num_buffer != 0

def queue(self, mv: memoryview) -> None:
self.buffer.append(mv)
Expand Down
10 changes: 10 additions & 0 deletions proxy/http/handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,7 @@ def is_inactive(self) -> bool:
def shutdown(self) -> None:
try:
# Flush pending buffer in threaded mode only.
#
# For threadless mode, BaseTcpServerHandler implements
# the must_flush_before_shutdown logic automagically.
if self.selector and self.work.has_buffer():
Expand All @@ -139,6 +140,15 @@ def shutdown(self) -> None:
except OSError:
pass
finally:
# Section 4.2.2.13 of RFC 1122 tells us that a close() with any pending readable data
# could lead to an immediate reset being sent.
#
# "A host MAY implement a 'half-duplex' TCP close sequence, so that an application
# that has called CLOSE cannot continue to read data from the connection.
# If such a host issues a CLOSE call while received data is still pending in TCP,
# or if new data is received after CLOSE is called, its TCP SHOULD send a RST to
# show that data was lost."
#
self.work.connection.close()
logger.debug('Client connection closed')
super().shutdown()
Expand Down
4 changes: 3 additions & 1 deletion proxy/http/parser/parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -358,7 +358,9 @@ def _process_line(self, raw: bytes) -> None:
line = raw.split(WHITESPACE, 2)
self.version = line[0]
self.code = line[1]
self.reason = line[2]
# Our own WebServerPlugin example currently doesn't send any reason
if len(line) == 3:
self.reason = line[2]
self.state = httpParserStates.LINE_RCVD

def _process_header(self, raw: bytes) -> None:
Expand Down