From bf62c694319c01abb063e1c69864a170884546fd Mon Sep 17 00:00:00 2001 From: Abhinav Singh Date: Tue, 28 Dec 2021 12:46:55 +0530 Subject: [PATCH 1/5] Tie connection pool into Threadless --- proxy/core/acceptor/threadless.py | 3 ++- proxy/core/acceptor/work.py | 3 +++ proxy/core/connection/__init__.py | 4 ++-- proxy/core/connection/pool.py | 11 ++++++++++- proxy/http/handler.py | 1 + proxy/http/plugin.py | 4 +++- proxy/http/proxy/server.py | 13 +++++-------- tests/core/test_conn_pool.py | 6 +++--- 8 files changed, 29 insertions(+), 16 deletions(-) diff --git a/proxy/core/acceptor/threadless.py b/proxy/core/acceptor/threadless.py index 5140c9345b..624da88227 100644 --- a/proxy/core/acceptor/threadless.py +++ b/proxy/core/acceptor/threadless.py @@ -25,7 +25,7 @@ from ...common.constants import DEFAULT_INACTIVE_CONN_CLEANUP_TIMEOUT, DEFAULT_SELECTOR_SELECT_TIMEOUT from ...common.constants import DEFAULT_WAIT_FOR_TASKS_TIMEOUT -from ..connection import TcpClientConnection +from ..connection import TcpClientConnection, UpstreamConnectionPool from ..event import eventNames, EventQueue from .work import Work @@ -87,6 +87,7 @@ def __init__( self.wait_timeout: float = DEFAULT_WAIT_FOR_TASKS_TIMEOUT self.cleanup_inactive_timeout: float = DEFAULT_INACTIVE_CONN_CLEANUP_TIMEOUT self._total: int = 0 + self._upstream_conn_pool: UpstreamConnectionPool = UpstreamConnectionPool() @property @abstractmethod diff --git a/proxy/core/acceptor/work.py b/proxy/core/acceptor/work.py index 5a7ba0723d..3de0737d6d 100644 --- a/proxy/core/acceptor/work.py +++ b/proxy/core/acceptor/work.py @@ -19,6 +19,7 @@ from typing import Optional, Dict, Any, TypeVar, Generic from ..event import eventNames, EventQueue +from ..connection import UpstreamConnectionPool from ...common.types import Readables, Writables T = TypeVar('T') @@ -30,6 +31,7 @@ class Work(ABC, Generic[T]): def __init__( self, work: T, + upstream_conn_pool: UpstreamConnectionPool, flags: argparse.Namespace, event_queue: Optional[EventQueue] = None, uid: Optional[str] = None, @@ -41,6 +43,7 @@ def __init__( self.event_queue = event_queue # Accept work self.work = work + self.upstream_conn_pool = upstream_conn_pool @abstractmethod async def get_events(self) -> Dict[int, int]: diff --git a/proxy/core/connection/__init__.py b/proxy/core/connection/__init__.py index 952ee08f9e..58d100a81b 100644 --- a/proxy/core/connection/__init__.py +++ b/proxy/core/connection/__init__.py @@ -16,7 +16,7 @@ from .connection import TcpConnection, TcpConnectionUninitializedException from .client import TcpClientConnection from .server import TcpServerConnection -from .pool import ConnectionPool +from .pool import UpstreamConnectionPool from .types import tcpConnectionTypes __all__ = [ @@ -25,5 +25,5 @@ 'TcpServerConnection', 'TcpClientConnection', 'tcpConnectionTypes', - 'ConnectionPool', + 'UpstreamConnectionPool', ] diff --git a/proxy/core/connection/pool.py b/proxy/core/connection/pool.py index 16cd5096b1..e66da4a3f5 100644 --- a/proxy/core/connection/pool.py +++ b/proxy/core/connection/pool.py @@ -17,6 +17,9 @@ from typing import Set, Dict, Tuple from ...common.flag import flags +from ...common.types import Readables, Writables + +from ..acceptor.work import Work from .server import TcpServerConnection @@ -31,7 +34,7 @@ ) -class ConnectionPool: +class UpstreamConnectionPool(Work[TcpServerConnection]): """Manages connection pool to upstream servers. `ConnectionPool` avoids need to reconnect with the upstream @@ -113,3 +116,9 @@ def release(self, conn: TcpServerConnection) -> None: assert not conn.is_reusable() # Reset for reusability conn.reset() + + async def get_events(self) -> Dict[int, int]: + return await super().get_events() + + async def handle_events(self, readables: Readables, writables: Writables) -> bool: + return await super().handle_events(readables, writables) diff --git a/proxy/http/handler.py b/proxy/http/handler.py index 38429df7de..ae6c0d66ca 100644 --- a/proxy/http/handler.py +++ b/proxy/http/handler.py @@ -100,6 +100,7 @@ def initialize(self) -> None: self.work, self.request, self.event_queue, + self.upstream_conn_pool, ) self.plugins[instance.name()] = instance logger.debug('Handling connection %r' % self.work.connection) diff --git a/proxy/http/plugin.py b/proxy/http/plugin.py index eafcd0539d..9c10355441 100644 --- a/proxy/http/plugin.py +++ b/proxy/http/plugin.py @@ -18,7 +18,7 @@ from ..common.types import Readables, Writables from ..core.event import EventQueue -from ..core.connection import TcpClientConnection +from ..core.connection import TcpClientConnection, UpstreamConnectionPool class HttpProtocolHandlerPlugin(ABC): @@ -50,12 +50,14 @@ def __init__( client: TcpClientConnection, request: HttpParser, event_queue: EventQueue, + upstream_conn_pool: UpstreamConnectionPool, ): self.uid: str = uid self.flags: argparse.Namespace = flags self.client: TcpClientConnection = client self.request: HttpParser = request self.event_queue = event_queue + self.upstream_conn_pool = upstream_conn_pool super().__init__() def name(self) -> str: diff --git a/proxy/http/proxy/server.py b/proxy/http/proxy/server.py index 55604abf63..68bc02dd56 100644 --- a/proxy/http/proxy/server.py +++ b/proxy/http/proxy/server.py @@ -44,7 +44,7 @@ from ...common.pki import gen_public_key, gen_csr, sign_csr from ...core.event import eventNames -from ...core.connection import TcpServerConnection, ConnectionPool +from ...core.connection import TcpServerConnection from ...core.connection import TcpConnectionUninitializedException from ...common.flag import flags @@ -140,9 +140,6 @@ class HttpProxyPlugin(HttpProtocolHandlerPlugin): # connection pool operations. lock = threading.Lock() - # Shared connection pool - pool = ConnectionPool() - def __init__( self, *args: Any, **kwargs: Any, @@ -203,7 +200,7 @@ def _close_and_release(self) -> bool: assert self.upstream and not self.upstream.closed self.upstream.closed = True with self.lock: - self.pool.release(self.upstream) + self.upstream_conn_pool.release(self.upstream) self.upstream = None return True @@ -393,7 +390,7 @@ def on_client_connection_close(self) -> None: if self.flags.enable_conn_pool: # Release the connection for reusability with self.lock: - self.pool.release(self.upstream) + self.upstream_conn_pool.release(self.upstream) return try: @@ -590,7 +587,7 @@ def connect_upstream(self) -> None: if host and port: if self.flags.enable_conn_pool: with self.lock: - created, self.upstream = self.pool.acquire( + created, self.upstream = self.upstream_conn_pool.acquire( text_(host), port, ) else: @@ -643,7 +640,7 @@ def connect_upstream(self) -> None: ) if self.flags.enable_conn_pool: with self.lock: - self.pool.release(self.upstream) + self.upstream_conn_pool.release(self.upstream) raise ProxyConnectionFailed( text_(host), port, repr(e), ) from e diff --git a/tests/core/test_conn_pool.py b/tests/core/test_conn_pool.py index 3eaad052f3..db3de3d7c0 100644 --- a/tests/core/test_conn_pool.py +++ b/tests/core/test_conn_pool.py @@ -12,14 +12,14 @@ from unittest import mock -from proxy.core.connection import ConnectionPool +from proxy.core.connection import UpstreamConnectionPool class TestConnectionPool(unittest.TestCase): @mock.patch('proxy.core.connection.pool.TcpServerConnection') def test_acquire_and_release_and_reacquire(self, mock_tcp_server_connection: mock.Mock) -> None: - pool = ConnectionPool() + pool = UpstreamConnectionPool() addr = ('localhost', 1234) # Mock mock_conn = mock_tcp_server_connection.return_value @@ -50,7 +50,7 @@ def test_acquire_and_release_and_reacquire(self, mock_tcp_server_connection: moc def test_closed_connections_are_removed_on_release( self, mock_tcp_server_connection: mock.Mock, ) -> None: - pool = ConnectionPool() + pool = UpstreamConnectionPool() addr = ('localhost', 1234) # Mock mock_conn = mock_tcp_server_connection.return_value From 8aa026da89ae54eaca73ab2546519a1465f4b823 Mon Sep 17 00:00:00 2001 From: Abhinav Singh Date: Tue, 28 Dec 2021 12:58:36 +0530 Subject: [PATCH 2/5] Pass upstream conn pool reference to work instances --- proxy/core/acceptor/threadless.py | 1 + 1 file changed, 1 insertion(+) diff --git a/proxy/core/acceptor/threadless.py b/proxy/core/acceptor/threadless.py index 624da88227..034da4b169 100644 --- a/proxy/core/acceptor/threadless.py +++ b/proxy/core/acceptor/threadless.py @@ -132,6 +132,7 @@ def work_on_tcp_conn( conn=conn, addr=addr, ), + self._upstream_conn_pool, flags=self.flags, event_queue=self.event_queue, uid=uid, From 1357e4dd400abc3fd838da1ac4ec925e9d2b3b75 Mon Sep 17 00:00:00 2001 From: Abhinav Singh Date: Tue, 28 Dec 2021 13:22:29 +0530 Subject: [PATCH 3/5] Mark upstream conn pool as optional --- proxy/core/acceptor/executors.py | 1 + proxy/core/acceptor/threadless.py | 2 +- proxy/core/acceptor/work.py | 8 +++++--- proxy/http/plugin.py | 2 +- proxy/http/proxy/server.py | 5 ++++- tests/core/test_acceptor.py | 1 + 6 files changed, 13 insertions(+), 6 deletions(-) diff --git a/proxy/core/acceptor/executors.py b/proxy/core/acceptor/executors.py index 9512762671..d0c5a912c3 100644 --- a/proxy/core/acceptor/executors.py +++ b/proxy/core/acceptor/executors.py @@ -131,6 +131,7 @@ def start_threaded_work( TcpClientConnection(conn, addr), flags=flags, event_queue=event_queue, + upstream_conn_pool=None, ) # TODO: Keep reference to threads and join during shutdown. # This will ensure connections are not abruptly closed on shutdown diff --git a/proxy/core/acceptor/threadless.py b/proxy/core/acceptor/threadless.py index 034da4b169..2f80d50daf 100644 --- a/proxy/core/acceptor/threadless.py +++ b/proxy/core/acceptor/threadless.py @@ -132,10 +132,10 @@ def work_on_tcp_conn( conn=conn, addr=addr, ), - self._upstream_conn_pool, flags=self.flags, event_queue=self.event_queue, uid=uid, + upstream_conn_pool=self._upstream_conn_pool, ) self.works[fileno].publish_event( event_name=eventNames.WORK_STARTED, diff --git a/proxy/core/acceptor/work.py b/proxy/core/acceptor/work.py index 3de0737d6d..37f0bf591b 100644 --- a/proxy/core/acceptor/work.py +++ b/proxy/core/acceptor/work.py @@ -16,12 +16,14 @@ from abc import ABC, abstractmethod from uuid import uuid4 -from typing import Optional, Dict, Any, TypeVar, Generic +from typing import Optional, Dict, Any, TypeVar, Generic, TYPE_CHECKING from ..event import eventNames, EventQueue -from ..connection import UpstreamConnectionPool from ...common.types import Readables, Writables +if TYPE_CHECKING: + from ..connection import UpstreamConnectionPool + T = TypeVar('T') @@ -31,10 +33,10 @@ class Work(ABC, Generic[T]): def __init__( self, work: T, - upstream_conn_pool: UpstreamConnectionPool, flags: argparse.Namespace, event_queue: Optional[EventQueue] = None, uid: Optional[str] = None, + upstream_conn_pool: Optional['UpstreamConnectionPool'] = None, ) -> None: # Work uuid self.uid: str = uid if uid is not None else uuid4().hex diff --git a/proxy/http/plugin.py b/proxy/http/plugin.py index 9c10355441..2a0e8804dc 100644 --- a/proxy/http/plugin.py +++ b/proxy/http/plugin.py @@ -50,7 +50,7 @@ def __init__( client: TcpClientConnection, request: HttpParser, event_queue: EventQueue, - upstream_conn_pool: UpstreamConnectionPool, + upstream_conn_pool: Optional['UpstreamConnectionPool'] = None, ): self.uid: str = uid self.flags: argparse.Namespace = flags diff --git a/proxy/http/proxy/server.py b/proxy/http/proxy/server.py index 68bc02dd56..228d8ecc34 100644 --- a/proxy/http/proxy/server.py +++ b/proxy/http/proxy/server.py @@ -197,7 +197,7 @@ def get_descriptors(self) -> Tuple[List[int], List[int]]: def _close_and_release(self) -> bool: if self.flags.enable_conn_pool: - assert self.upstream and not self.upstream.closed + assert self.upstream and not self.upstream.closed and self.upstream_conn_pool self.upstream.closed = True with self.lock: self.upstream_conn_pool.release(self.upstream) @@ -388,6 +388,7 @@ def on_client_connection_close(self) -> None: return if self.flags.enable_conn_pool: + assert self.upstream_conn_pool # Release the connection for reusability with self.lock: self.upstream_conn_pool.release(self.upstream) @@ -586,6 +587,7 @@ def connect_upstream(self) -> None: host, port = self.request.host, self.request.port if host and port: if self.flags.enable_conn_pool: + assert self.upstream_conn_pool with self.lock: created, self.upstream = self.upstream_conn_pool.acquire( text_(host), port, @@ -639,6 +641,7 @@ def connect_upstream(self) -> None: ), ) if self.flags.enable_conn_pool: + assert self.upstream_conn_pool with self.lock: self.upstream_conn_pool.release(self.upstream) raise ProxyConnectionFailed( diff --git a/tests/core/test_acceptor.py b/tests/core/test_acceptor.py index 2a4d089898..89bbce46aa 100644 --- a/tests/core/test_acceptor.py +++ b/tests/core/test_acceptor.py @@ -101,6 +101,7 @@ def test_accepts_client_from_server_socket( mock_client.return_value, flags=self.flags, event_queue=None, + upstream_conn_pool=None, ) mock_thread.assert_called_with( target=self.flags.work_klass.return_value.run, From 26367241198fde5284a0c665c56f395cf792084a Mon Sep 17 00:00:00 2001 From: Abhinav Singh Date: Tue, 28 Dec 2021 13:30:06 +0530 Subject: [PATCH 4/5] spellcheck --- docs/conf.py | 1 + proxy/core/acceptor/threadless.py | 4 +++- proxy/core/connection/pool.py | 8 ++++---- proxy/plugin/proxy_pool.py | 2 +- 4 files changed, 9 insertions(+), 6 deletions(-) diff --git a/docs/conf.py b/docs/conf.py index 8172d57f55..402e263b2d 100644 --- a/docs/conf.py +++ b/docs/conf.py @@ -300,6 +300,7 @@ (_py_class_role, 'unittest.case.TestCase'), (_py_class_role, 'unittest.result.TestResult'), (_py_class_role, 'UUID'), + (_py_class_role, 'UpstreamConnectionPool'), (_py_class_role, 'Url'), (_py_class_role, 'WebsocketFrame'), (_py_class_role, 'Work'), diff --git a/proxy/core/acceptor/threadless.py b/proxy/core/acceptor/threadless.py index 2f80d50daf..9858712adc 100644 --- a/proxy/core/acceptor/threadless.py +++ b/proxy/core/acceptor/threadless.py @@ -87,7 +87,9 @@ def __init__( self.wait_timeout: float = DEFAULT_WAIT_FOR_TASKS_TIMEOUT self.cleanup_inactive_timeout: float = DEFAULT_INACTIVE_CONN_CLEANUP_TIMEOUT self._total: int = 0 - self._upstream_conn_pool: UpstreamConnectionPool = UpstreamConnectionPool() + self._upstream_conn_pool: Optional[UpstreamConnectionPool] = None + if self.flags.enable_conn_pool: + self._upstream_conn_pool = UpstreamConnectionPool() @property @abstractmethod diff --git a/proxy/core/connection/pool.py b/proxy/core/connection/pool.py index e66da4a3f5..5f92066b9d 100644 --- a/proxy/core/connection/pool.py +++ b/proxy/core/connection/pool.py @@ -37,7 +37,7 @@ class UpstreamConnectionPool(Work[TcpServerConnection]): """Manages connection pool to upstream servers. - `ConnectionPool` avoids need to reconnect with the upstream + `UpstreamConnectionPool` avoids need to reconnect with the upstream servers repeatedly when a reusable connection is available in the pool. @@ -50,16 +50,16 @@ class UpstreamConnectionPool(Work[TcpServerConnection]): the pool users. Example, if acquired connection is stale, reacquire. - TODO: Ideally, ConnectionPool must be shared across + TODO: Ideally, `UpstreamConnectionPool` must be shared across all cores to make SSL session cache to also work without additional out-of-bound synchronizations. - TODO: ConnectionPool currently WON'T work for + TODO: `UpstreamConnectionPool` currently WON'T work for HTTPS connection. This is because of missing support for session cache, session ticket, abbr TLS handshake and other necessary features to make it work. - NOTE: However, for all HTTP only connections, ConnectionPool + NOTE: However, for all HTTP only connections, `UpstreamConnectionPool` can be used to save upon connection setup time and speed-up performance of requests. """ diff --git a/proxy/plugin/proxy_pool.py b/proxy/plugin/proxy_pool.py index cfc8017820..641d95d622 100644 --- a/proxy/plugin/proxy_pool.py +++ b/proxy/plugin/proxy_pool.py @@ -88,7 +88,7 @@ def before_upstream_connection( must be bootstrapped within it's own re-usable and garbage collected pool, to avoid establishing a new upstream proxy connection for each client request. - See :class:`~proxy.core.connection.pool.ConnectionPool` which is a work + See :class:`~proxy.core.connection.pool.UpstreamConnectionPool` which is a work in progress for SSL cache handling. """ # We don't want to send private IP requests to remote proxies From ff304cec21740700d1268864fd7d51ea8c0c5b52 Mon Sep 17 00:00:00 2001 From: Abhinav Singh Date: Tue, 28 Dec 2021 13:40:27 +0530 Subject: [PATCH 5/5] Fix unused import --- proxy/http/plugin.py | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/proxy/http/plugin.py b/proxy/http/plugin.py index 2a0e8804dc..0180e5f9d9 100644 --- a/proxy/http/plugin.py +++ b/proxy/http/plugin.py @@ -12,13 +12,16 @@ import argparse from abc import ABC, abstractmethod -from typing import Tuple, List, Union, Optional +from typing import Tuple, List, Union, Optional, TYPE_CHECKING from .parser import HttpParser from ..common.types import Readables, Writables from ..core.event import EventQueue -from ..core.connection import TcpClientConnection, UpstreamConnectionPool +from ..core.connection import TcpClientConnection + +if TYPE_CHECKING: + from ..core.connection import UpstreamConnectionPool class HttpProtocolHandlerPlugin(ABC):