From 143d2f081f6f98d5e1430d4f482b9e395d463857 Mon Sep 17 00:00:00 2001 From: ptiurin Date: Wed, 23 Aug 2023 10:13:41 +0100 Subject: [PATCH 1/8] fix: Remove redundant lock --- setup.cfg | 1 - src/firebolt/async_db/cursor.py | 28 ++++++++++------------------ 2 files changed, 10 insertions(+), 19 deletions(-) diff --git a/setup.cfg b/setup.cfg index cc7dae3c4af..c6f3339eb8d 100755 --- a/setup.cfg +++ b/setup.cfg @@ -34,7 +34,6 @@ install_requires = python-dateutil>=2.8.2 readerwriterlock>=1.0.9 sqlparse>=0.4.2 - tricycle>=0.2.2 trio>=0.22.0 python_requires = >=3.7 include_package_data = True diff --git a/src/firebolt/async_db/cursor.py b/src/firebolt/async_db/cursor.py index 7d2d5e25a33..3e8d108c670 100644 --- a/src/firebolt/async_db/cursor.py +++ b/src/firebolt/async_db/cursor.py @@ -17,7 +17,6 @@ ) from httpx import Response, codes -from tricycle import RWLock from firebolt.async_db.util import is_db_available, is_engine_running from firebolt.client import AsyncClient @@ -69,8 +68,6 @@ class Cursor(BaseCursor): """ - __slots__ = BaseCursor.__slots__ + ("_async_query_lock",) - def __init__( self, *args: Any, @@ -79,7 +76,6 @@ def __init__( **kwargs: Any, ) -> None: super().__init__(*args, **kwargs) - self._async_query_lock = RWLock() self._client = client self.connection = connection @@ -392,29 +388,25 @@ async def cancel(self, query_id: str) -> None: @wraps(BaseCursor.fetchone) async def fetchone(self) -> Optional[List[ColType]]: - async with self._async_query_lock.read_locked(): - """Fetch the next row of a query result set.""" - return super().fetchone() + """Fetch the next row of a query result set.""" + return super().fetchone() @wraps(BaseCursor.fetchmany) async def fetchmany(self, size: Optional[int] = None) -> List[List[ColType]]: - async with self._async_query_lock.read_locked(): - """ - Fetch the next set of rows of a query result; - size is cursor.arraysize by default. - """ - return super().fetchmany(size) + """ + Fetch the next set of rows of a query result; + size is cursor.arraysize by default. + """ + return super().fetchmany(size) @wraps(BaseCursor.fetchall) async def fetchall(self) -> List[List[ColType]]: - async with self._async_query_lock.read_locked(): - """Fetch all remaining rows of a query result.""" - return super().fetchall() + """Fetch all remaining rows of a query result.""" + return super().fetchall() @wraps(BaseCursor.nextset) async def nextset(self) -> None: - async with self._async_query_lock.read_locked(): - return super().nextset() + return super().nextset() @check_not_closed def __enter__(self) -> Cursor: From 8e594d86e4c9215e7de2ce21f28156d00e930581 Mon Sep 17 00:00:00 2001 From: ptiurin Date: Wed, 23 Aug 2023 16:53:08 +0100 Subject: [PATCH 2/8] Adding doc explanation --- docsrc/Connecting_and_queries.rst | 11 +++++++++++ 1 file changed, 11 insertions(+) diff --git a/docsrc/Connecting_and_queries.rst b/docsrc/Connecting_and_queries.rst index 5f9a3da1915..939feb7eda0 100644 --- a/docsrc/Connecting_and_queries.rst +++ b/docsrc/Connecting_and_queries.rst @@ -571,6 +571,17 @@ In addition, server-side asynchronous queries can be cancelled calling ``cancel( **Returns**: ``CANCELED_EXECUTION`` +Thread safety +============================== + +Thread safety is set to 2, meaning it's safe to share the module and +:ref:`Connection ` object across threads. +:ref:`Cursor ` is a lightweight object that should be instantiated +by calling ``connection.cursor()`` within a thread and should not be shared across different threads. +Similarly, in an asynchronous context the Cursor obejct should not be shared across tasks +as it will lead to a nondeterministic data returned. Follow the best practice from the +:ref:`Running multiple queries in parallel`. + Using DATE and DATETIME values ============================== From 9de4565c8858447b7d464dcb3cec0fbae2a9d747 Mon Sep 17 00:00:00 2001 From: ptiurin Date: Wed, 23 Aug 2023 17:05:13 +0100 Subject: [PATCH 3/8] fix broken doc link --- docsrc/Connecting_and_queries.rst | 2 ++ 1 file changed, 2 insertions(+) diff --git a/docsrc/Connecting_and_queries.rst b/docsrc/Connecting_and_queries.rst index 939feb7eda0..5be71dd22a1 100644 --- a/docsrc/Connecting_and_queries.rst +++ b/docsrc/Connecting_and_queries.rst @@ -349,6 +349,8 @@ It can be extended to run alongside of other operations. Running multiple queries in parallel ------------------------------------ +.. _Running multiple queries in parallel: + Building up on the previous example, we can execute several queries concurently. This is especially useful when queries do not depend on each other and can be run at the same time. From c91ea562cb89afb91637bf857651c6621df846a6 Mon Sep 17 00:00:00 2001 From: ptiurin Date: Wed, 23 Aug 2023 17:10:41 +0100 Subject: [PATCH 4/8] try fixing it again --- docsrc/Connecting_and_queries.rst | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/docsrc/Connecting_and_queries.rst b/docsrc/Connecting_and_queries.rst index 5be71dd22a1..cc2e2cd67ed 100644 --- a/docsrc/Connecting_and_queries.rst +++ b/docsrc/Connecting_and_queries.rst @@ -349,7 +349,7 @@ It can be extended to run alongside of other operations. Running multiple queries in parallel ------------------------------------ -.. _Running multiple queries in parallel: +.. _running_multiple_queries_in_parallel: Building up on the previous example, we can execute several queries concurently. This is especially useful when queries do not depend on each other and can be run @@ -582,7 +582,7 @@ Thread safety is set to 2, meaning it's safe to share the module and by calling ``connection.cursor()`` within a thread and should not be shared across different threads. Similarly, in an asynchronous context the Cursor obejct should not be shared across tasks as it will lead to a nondeterministic data returned. Follow the best practice from the -:ref:`Running multiple queries in parallel`. +:ref:`Running multiple queries in parallel <_running_multiple_queries_in_parallel>`. Using DATE and DATETIME values From 003a846d16e89fdb5f58821815c99f944d19dc33 Mon Sep 17 00:00:00 2001 From: ptiurin Date: Wed, 23 Aug 2023 17:14:52 +0100 Subject: [PATCH 5/8] ughhh --- docsrc/Connecting_and_queries.rst | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docsrc/Connecting_and_queries.rst b/docsrc/Connecting_and_queries.rst index cc2e2cd67ed..107d3afa9aa 100644 --- a/docsrc/Connecting_and_queries.rst +++ b/docsrc/Connecting_and_queries.rst @@ -582,7 +582,7 @@ Thread safety is set to 2, meaning it's safe to share the module and by calling ``connection.cursor()`` within a thread and should not be shared across different threads. Similarly, in an asynchronous context the Cursor obejct should not be shared across tasks as it will lead to a nondeterministic data returned. Follow the best practice from the -:ref:`Running multiple queries in parallel <_running_multiple_queries_in_parallel>`. +:ref:`connecting_and_queries:Running multiple queries in parallel`. Using DATE and DATETIME values From 51e827e8ce70ba10976995f241fc85434e8ae2b0 Mon Sep 17 00:00:00 2001 From: ptiurin Date: Wed, 23 Aug 2023 17:19:10 +0100 Subject: [PATCH 6/8] Remove unnecessary anchor --- docsrc/Connecting_and_queries.rst | 2 -- 1 file changed, 2 deletions(-) diff --git a/docsrc/Connecting_and_queries.rst b/docsrc/Connecting_and_queries.rst index 107d3afa9aa..94de9681056 100644 --- a/docsrc/Connecting_and_queries.rst +++ b/docsrc/Connecting_and_queries.rst @@ -349,8 +349,6 @@ It can be extended to run alongside of other operations. Running multiple queries in parallel ------------------------------------ -.. _running_multiple_queries_in_parallel: - Building up on the previous example, we can execute several queries concurently. This is especially useful when queries do not depend on each other and can be run at the same time. From af231729397b9b9d718d82437540dc281a7308e8 Mon Sep 17 00:00:00 2001 From: ptiurin Date: Thu, 24 Aug 2023 11:45:10 +0100 Subject: [PATCH 7/8] Add test for concurrent auth --- tests/unit/client/test_client_async.py | 49 ++++++++++++++++++++++++-- 1 file changed, 46 insertions(+), 3 deletions(-) diff --git a/tests/unit/client/test_client_async.py b/tests/unit/client/test_client_async.py index a1eefddc970..4095e2bf9df 100644 --- a/tests/unit/client/test_client_async.py +++ b/tests/unit/client/test_client_async.py @@ -1,12 +1,13 @@ -from re import Pattern +from re import Pattern, compile from typing import Callable -from httpx import codes +from httpx import Request, Response, codes from pytest import raises from pytest_httpx import HTTPXMock +from trio import open_nursery from firebolt.client import AsyncClient -from firebolt.client.auth import Auth +from firebolt.client.auth import Auth, ClientCredentials from firebolt.utils.urls import AUTH_SERVICE_ACCOUNT_URL from firebolt.utils.util import fix_url_schema @@ -110,3 +111,45 @@ async def test_client_account_id( api_endpoint=server, ) as c: assert await c.account_id == account_id, "Invalid account id returned." + + +async def test_concurent_auth_lock( + httpx_mock: HTTPXMock, + account_name: str, + server: str, + client_id: str, + client_secret: str, + access_token: str, + auth_url: str, + check_token_callback: Callable, +) -> None: + CONCURENT_COUNT = 10 + url = "https://url" + + checked_creds_times = 0 + + def check_credentials( + request: Request = None, + **kwargs, + ) -> Response: + nonlocal checked_creds_times + checked_creds_times += 1 + return Response( + status_code=codes.OK, + json={"expires_in": 2**32, "access_token": access_token}, + ) + + httpx_mock.add_callback(check_token_callback, url=compile(f"{url}/.")) + httpx_mock.add_callback(check_credentials, url=auth_url) + + async with AsyncClient( + auth=ClientCredentials(client_id, client_secret), + api_endpoint=server, + account_name=account_name, + ) as c: + urls = [f"{url}/{i}" for i in range(CONCURENT_COUNT)] + async with open_nursery() as nursery: + for url in urls: + nursery.start_soon(c.get, url) + + assert checked_creds_times == 1 From 77d8a829f821611feabccc21dbf8ead8fb4007d6 Mon Sep 17 00:00:00 2001 From: ptiurin Date: Fri, 25 Aug 2023 14:32:56 +0100 Subject: [PATCH 8/8] Adding lock to auth --- src/firebolt/client/auth/base.py | 31 +++++++++++++++++++++++++- tests/unit/client/test_client_async.py | 11 +++++++-- 2 files changed, 39 insertions(+), 3 deletions(-) diff --git a/src/firebolt/client/auth/base.py b/src/firebolt/client/auth/base.py index cacd286913e..a4650f0cf21 100644 --- a/src/firebolt/client/auth/base.py +++ b/src/firebolt/client/auth/base.py @@ -1,8 +1,9 @@ from time import time -from typing import Generator, Optional +from typing import AsyncGenerator, Generator, Optional from httpx import Auth as HttpxAuth from httpx import Request, Response, codes +from trio import Lock from firebolt.utils.token_storage import TokenSecureStorage from firebolt.utils.util import Timer, cached_property @@ -35,6 +36,7 @@ def __init__(self, use_token_cache: bool = True): self._use_token_cache = use_token_cache self._token: Optional[str] = self._get_cached_token() self._expires: Optional[int] = None + self._lock = Lock() def copy(self) -> "Auth": """Make another auth object with same credentials. @@ -120,3 +122,30 @@ def auth_flow(self, request: Request) -> Generator[Request, Response, None]: yield from self.get_new_token_generator() request.headers["Authorization"] = f"Bearer {self.token}" yield request + + async def async_auth_flow( + self, request: Request + ) -> AsyncGenerator[Request, Response]: + """ + Execute the authentication flow asynchronously. + + Overridden in order to lock and ensure no more than + one authentication request is sent at a time. This + avoids excessive load on the auth server. + """ + if self.requires_request_body: + await request.aread() + + async with self._lock: + flow = self.auth_flow(request) + request = next(flow) + + while True: + response = yield request + if self.requires_response_body: + await response.aread() + + try: + request = flow.send(response) + except StopIteration: + break diff --git a/tests/unit/client/test_client_async.py b/tests/unit/client/test_client_async.py index 4095e2bf9df..cfc1da203a1 100644 --- a/tests/unit/client/test_client_async.py +++ b/tests/unit/client/test_client_async.py @@ -1,10 +1,11 @@ from re import Pattern, compile -from typing import Callable +from types import MethodType +from typing import Any, Callable from httpx import Request, Response, codes from pytest import raises from pytest_httpx import HTTPXMock -from trio import open_nursery +from trio import open_nursery, sleep from firebolt.client import AsyncClient from firebolt.client.auth import Auth, ClientCredentials @@ -128,6 +129,11 @@ async def test_concurent_auth_lock( checked_creds_times = 0 + async def mock_send_handling_redirects(self, *args: Any, **kwargs: Any) -> Response: + # simulate network delay so the context switches + await sleep(0.01) + return await AsyncClient._send_handling_redirects(self, *args, **kwargs) + def check_credentials( request: Request = None, **kwargs, @@ -147,6 +153,7 @@ def check_credentials( api_endpoint=server, account_name=account_name, ) as c: + c._send_handling_redirects = MethodType(mock_send_handling_redirects, c) urls = [f"{url}/{i}" for i in range(CONCURENT_COUNT)] async with open_nursery() as nursery: for url in urls: