From 4bfac10fcd1927816c647bc3a7470843cdade72d Mon Sep 17 00:00:00 2001 From: Abhinav Singh Date: Wed, 1 Dec 2021 08:05:17 +0530 Subject: [PATCH 1/2] No content length or chunked case can occur with `HTTP/1.1` too --- Makefile | 1 - proxy/core/acceptor/threadless.py | 13 +++++++------ proxy/core/connection/connection.py | 2 +- proxy/http/handler.py | 10 ++++++++++ proxy/http/parser/parser.py | 4 +++- 5 files changed, 21 insertions(+), 9 deletions(-) diff --git a/Makefile b/Makefile index 43e520ef3d..95da7457bd 100644 --- a/Makefile +++ b/Makefile @@ -141,7 +141,6 @@ lib-profile: --hostname 127.0.0.1 \ --num-acceptors 1 \ --num-workers 1 \ - --disable-http-proxy \ --enable-web-server \ --plugin proxy.plugin.WebServerPlugin \ --local-executor \ diff --git a/proxy/core/acceptor/threadless.py b/proxy/core/acceptor/threadless.py index 5a0b325735..fcf7ad8872 100644 --- a/proxy/core/acceptor/threadless.py +++ b/proxy/core/acceptor/threadless.py @@ -224,16 +224,13 @@ async def _selected_events(self) -> Tuple[ work_by_ids[key.data][1].append(key.fileobj) return (work_by_ids, new_work_available) - async def _wait_for_tasks(self) -> None: + async def _wait_for_tasks(self) -> Set['asyncio.Task[bool]']: finished, self.unfinished = await asyncio.wait( self.unfinished, timeout=self.wait_timeout, return_when=asyncio.FIRST_COMPLETED, ) - for task in finished: - if task.result(): - self._cleanup(task._work_id) # type: ignore - # self.cleanup(int(task.get_name())) + return finished def _fromfd(self, fileno: int) -> socket.socket: return socket.fromfd( @@ -299,7 +296,11 @@ async def _run_once(self) -> bool: # Invoke Threadless.handle_events self.unfinished.update(self._create_tasks(work_by_ids)) # logger.debug('Executing {0} works'.format(len(self.unfinished))) - await self._wait_for_tasks() + # Cleanup finished tasks + for task in await self._wait_for_tasks(): + if task.result(): + self._cleanup(task._work_id) # type: ignore + # self.cleanup(int(task.get_name())) # logger.debug( # 'Done executing works, {0} pending, {1} registered'.format( # len(self.unfinished), len(self.registered_events_by_work_ids), diff --git a/proxy/core/connection/connection.py b/proxy/core/connection/connection.py index 69036755aa..84c6c9de74 100644 --- a/proxy/core/connection/connection.py +++ b/proxy/core/connection/connection.py @@ -75,7 +75,7 @@ def close(self) -> bool: return self.closed def has_buffer(self) -> bool: - return self._num_buffer > 0 + return self._num_buffer != 0 def queue(self, mv: memoryview) -> None: self.buffer.append(mv) diff --git a/proxy/http/handler.py b/proxy/http/handler.py index b6cd6f250d..bcb76ceb96 100644 --- a/proxy/http/handler.py +++ b/proxy/http/handler.py @@ -117,6 +117,7 @@ def is_inactive(self) -> bool: def shutdown(self) -> None: try: # Flush pending buffer in threaded mode only. + # # For threadless mode, BaseTcpServerHandler implements # the must_flush_before_shutdown logic automagically. if self.selector and self.work.has_buffer(): @@ -139,6 +140,15 @@ def shutdown(self) -> None: except OSError: pass finally: + # Section 4.2.2.13 of RFC 1122 tells us that a close() with any pending readable data + # could lead to an immediate reset being sent. + # + # "A host MAY implement a 'half-duplex' TCP close sequence, so that an application + # that has called CLOSE cannot continue to read data from the connection. + # If such a host issues a CLOSE call while received data is still pending in TCP, + # or if new data is received after CLOSE is called, its TCP SHOULD send a RST to + # show that data was lost." + # self.work.connection.close() logger.debug('Client connection closed') super().shutdown() diff --git a/proxy/http/parser/parser.py b/proxy/http/parser/parser.py index 2785b246ec..3554cf2bfe 100644 --- a/proxy/http/parser/parser.py +++ b/proxy/http/parser/parser.py @@ -358,7 +358,9 @@ def _process_line(self, raw: bytes) -> None: line = raw.split(WHITESPACE, 2) self.version = line[0] self.code = line[1] - self.reason = line[2] + # Our own WebServerPlugin example currently doesn't send any reason + if len(line) == 3: + self.reason = line[2] self.state = httpParserStates.LINE_RCVD def _process_header(self, raw: bytes) -> None: From da263dc062bbaf19ab02fce5b6ac6eb697f13aea Mon Sep 17 00:00:00 2001 From: Abhinav Singh Date: Wed, 1 Dec 2021 08:13:27 +0530 Subject: [PATCH 2/2] `WPS331` false-positive --- proxy/core/acceptor/threadless.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/proxy/core/acceptor/threadless.py b/proxy/core/acceptor/threadless.py index fcf7ad8872..1c09167271 100644 --- a/proxy/core/acceptor/threadless.py +++ b/proxy/core/acceptor/threadless.py @@ -230,7 +230,7 @@ async def _wait_for_tasks(self) -> Set['asyncio.Task[bool]']: timeout=self.wait_timeout, return_when=asyncio.FIRST_COMPLETED, ) - return finished + return finished # noqa: WPS331 def _fromfd(self, fileno: int) -> socket.socket: return socket.fromfd(