From e7c1e4741dceadc58b01ed4e1fb1a61c778622f2 Mon Sep 17 00:00:00 2001 From: ptiurin Date: Wed, 22 Jan 2025 17:53:43 +0000 Subject: [PATCH 01/19] WIP: async --- src/firebolt/async_db/connection.py | 32 +++++++++++++++++++-- src/firebolt/async_db/cursor.py | 21 ++++++++++++++ src/firebolt/client/constants.py | 2 +- src/firebolt/common/base_connection.py | 4 +++ src/firebolt/common/base_cursor.py | 29 +++++++++++++++++++ src/firebolt/db/connection.py | 40 ++++++++++++++++++++++++-- src/firebolt/db/cursor.py | 28 ++++++++++++++++++ src/firebolt/utils/exception.py | 14 +++++++++ 8 files changed, 165 insertions(+), 5 deletions(-) diff --git a/src/firebolt/async_db/connection.py b/src/firebolt/async_db/connection.py index e98b7d1e237..da0f79753eb 100644 --- a/src/firebolt/async_db/connection.py +++ b/src/firebolt/async_db/connection.py @@ -10,10 +10,19 @@ from firebolt.client import DEFAULT_API_URL from firebolt.client.auth import Auth from firebolt.client.client import AsyncClient, AsyncClientV1, AsyncClientV2 -from firebolt.common.base_connection import BaseConnection +from firebolt.common.base_connection import ( + ASYNC_QUERY_STATUS_REQUEST, + ASYNC_QUERY_STATUS_RUNNING, + ASYNC_QUERY_STATUS_SUCCESSFUL, + BaseConnection, +) from firebolt.common.cache import _firebolt_system_engine_cache from firebolt.common.constants import DEFAULT_TIMEOUT_SECONDS -from firebolt.utils.exception import ConfigurationError, ConnectionClosedError +from firebolt.utils.exception import ( + ConfigurationError, + ConnectionClosedError, + FireboltError, +) from firebolt.utils.usage_tracker import get_user_agent_header from firebolt.utils.util import fix_url_schema, validate_engine_name_and_url_v1 @@ -81,6 +90,25 @@ def cursor(self, **kwargs: Any) -> Cursor: self._cursors.append(c) return c + # Server-side async methods + async def _get_async_query_status(self, token: str) -> str: + cursor = self.cursor() + await cursor.execute(ASYNC_QUERY_STATUS_REQUEST.format(token=token)) + if cursor.rowcount != 1: + raise FireboltError("Unexpected result from async query status request.") + result = await cursor.fetchone() + columns = cursor.description + result_dict = dict(zip([column.name for column in columns], result)) + return result_dict["status"] + + async def is_async_query_running(self, token: str) -> bool: + status = await self._get_async_query_status(token) + return status == ASYNC_QUERY_STATUS_RUNNING + + async def is_async_query_successful(self, token: str) -> bool: + status = await self._get_async_query_status(token) + return status == ASYNC_QUERY_STATUS_SUCCESSFUL + # Context manager support async def __aenter__(self) -> Connection: if self.closed: diff --git a/src/firebolt/async_db/cursor.py b/src/firebolt/async_db/cursor.py index 0c0385169b2..83c0b0535df 100644 --- a/src/firebolt/async_db/cursor.py +++ b/src/firebolt/async_db/cursor.py @@ -392,6 +392,27 @@ def __init__( assert isinstance(client, AsyncClientV2) super().__init__(*args, client=client, connection=connection, **kwargs) + @check_not_closed + async def execute_async( + self, + query: str, + parameters: Optional[Sequence[ParameterType]] = None, + skip_parsing: bool = False, + ) -> int: + self._reset() + # Allow users to manually skip parsing for performance improvement. + queries: List[Union[SetParameter, str]] = ( + [query] if skip_parsing else split_format_sql(query, parameters) + ) + for query in queries: + resp = await self._api_request( + query, + {"output_format": JSON_OUTPUT_FORMAT, "async": True}, + ) + await self._raise_if_error(resp) + self._parse_async_response(resp) + return -1 + async def is_db_available(self, database_name: str) -> bool: """ Verify that the database exists. diff --git a/src/firebolt/client/constants.py b/src/firebolt/client/constants.py index 78d23c1bfa8..f5860e83d23 100644 --- a/src/firebolt/client/constants.py +++ b/src/firebolt/client/constants.py @@ -5,7 +5,7 @@ DEFAULT_API_URL: str = "api.app.firebolt.io" PROTOCOL_VERSION_HEADER_NAME = "Firebolt-Protocol-Version" -PROTOCOL_VERSION: str = "2.1" +PROTOCOL_VERSION: str = "2.3" _REQUEST_ERRORS: Tuple[Type, ...] = ( HTTPError, InvalidURL, diff --git a/src/firebolt/common/base_connection.py b/src/firebolt/common/base_connection.py index 1d47374015c..4e290f24a97 100644 --- a/src/firebolt/common/base_connection.py +++ b/src/firebolt/common/base_connection.py @@ -2,6 +2,10 @@ from firebolt.utils.exception import ConnectionClosedError +ASYNC_QUERY_STATUS_RUNNING = "RUNNING" +ASYNC_QUERY_STATUS_SUCCESSFUL = "ENDED_SUCCESSFULLY" +ASYNC_QUERY_STATUS_REQUEST = "CALL fb_GetAsyncStatus('{token}')" + class BaseConnection: def __init__(self) -> None: diff --git a/src/firebolt/common/base_cursor.py b/src/firebolt/common/base_cursor.py index d521df375a2..1e50cad0faa 100644 --- a/src/firebolt/common/base_cursor.py +++ b/src/firebolt/common/base_cursor.py @@ -22,6 +22,8 @@ ConfigurationError, CursorClosedError, DataError, + FireboltError, + MethodNotAllowedInAsyncError, QueryNotRunError, ) from firebolt.utils.util import Timer, fix_url_schema @@ -85,6 +87,13 @@ def _raise_if_internal_set_parameter(parameter: SetParameter) -> None: ) +@dataclass +class AsyncResponse: + token: str + message: str + monitorSql: str + + @dataclass class Statistics: """ @@ -142,6 +151,9 @@ def check_query_executed(func: Callable) -> Callable: def inner(self: BaseCursor, *args: Any, **kwargs: Any) -> Any: if self._state == CursorState.NONE: raise QueryNotRunError(method_name=func.__name__) + if self._query_token: + # query_token is set only for async queries + raise MethodNotAllowedInAsyncError(method_name=func.__name__) return func(self, *args, **kwargs) return inner @@ -163,6 +175,7 @@ class BaseCursor: "_next_set_idx", "_set_parameters", "_query_id", + "_query_token", "engine_url", ) @@ -184,6 +197,7 @@ def __init__(self, *args: Any, **kwargs: Any) -> None: self._idx = 0 self._next_set_idx = 0 self._query_id = "" + self._query_token = "" self._reset() @property @@ -229,6 +243,15 @@ def query_id(self) -> str: """The query id of a query executed asynchronously.""" return self._query_id + @property + def async_query_token(self) -> str: + """The query token of a query executed asynchronously.""" + if not self._query_token: + raise FireboltError( + "No async query was executed or query was not an async." + ) + return self._query_token + @property def arraysize(self) -> int: """Default number of rows returned by fetchmany.""" @@ -290,6 +313,7 @@ def _reset(self) -> None: self._row_sets = [] self._next_set_idx = 0 self._query_id = "" + self._query_token = "" def _update_set_parameters(self, parameters: Dict[str, Any]) -> None: # Split parameters into immutable and user parameters @@ -334,6 +358,11 @@ def engine_name(self) -> str: return self.parameters["engine"] return URL(self.engine_url).host.split(".")[0].replace("-", "_") + def _parse_async_response(self, response: Response) -> None: + """Handle async response from the server.""" + async_response = AsyncResponse(**response.json()) + self._query_token = async_response.token + def _row_set_from_response(self, response: Response) -> RowSet: """Fetch information about executed query from http response.""" diff --git a/src/firebolt/db/connection.py b/src/firebolt/db/connection.py index 9914e3174a8..96842bd42e2 100644 --- a/src/firebolt/db/connection.py +++ b/src/firebolt/db/connection.py @@ -9,12 +9,21 @@ from firebolt.client import DEFAULT_API_URL, Client, ClientV1, ClientV2 from firebolt.client.auth import Auth -from firebolt.common.base_connection import BaseConnection +from firebolt.common.base_connection import ( + ASYNC_QUERY_STATUS_REQUEST, + ASYNC_QUERY_STATUS_RUNNING, + ASYNC_QUERY_STATUS_SUCCESSFUL, + BaseConnection, +) from firebolt.common.cache import _firebolt_system_engine_cache from firebolt.common.constants import DEFAULT_TIMEOUT_SECONDS from firebolt.db.cursor import Cursor, CursorV1, CursorV2 from firebolt.db.util import _get_system_engine_url_and_params -from firebolt.utils.exception import ConfigurationError, ConnectionClosedError +from firebolt.utils.exception import ( + ConfigurationError, + ConnectionClosedError, + FireboltError, +) from firebolt.utils.usage_tracker import get_user_agent_header from firebolt.utils.util import fix_url_schema, validate_engine_name_and_url_v1 @@ -143,6 +152,15 @@ def connect_v2( ) +def ensure_v2(func): + def wrapper(self, *args, **kwargs): + if self.cursor_type != CursorV2: + raise FireboltError("This method is only supported for CursorV2.") + return func(self, *args, **kwargs) + + return wrapper + + class Connection(BaseConnection): """ Firebolt database connection class. Implements PEP-249. @@ -218,6 +236,24 @@ def close(self) -> None: self._client.close() self._is_closed = True + # Server-side async methods + @ensure_v2 + def _get_async_query_status(self, token: str) -> str: + cursor = self.cursor() + cursor.execute(ASYNC_QUERY_STATUS_REQUEST.format(token=token)) + if cursor.rowcount != 1: + raise FireboltError("Unexpected result from async query status request.") + result = cursor.fetchone() + columns = cursor.description + result_dict = dict(zip([column.name for column in columns], result)) + return result_dict["status"] + + def is_async_query_running(self, token: str) -> bool: + return self._get_async_query_status(token) == ASYNC_QUERY_STATUS_RUNNING + + def is_async_query_successful(self, token: str) -> bool: + return self._get_async_query_status(token) == ASYNC_QUERY_STATUS_SUCCESSFUL + # Context manager support def __enter__(self) -> Connection: if self.closed: diff --git a/src/firebolt/db/cursor.py b/src/firebolt/db/cursor.py index 86943f9ae8a..a48ecd6dc6f 100644 --- a/src/firebolt/db/cursor.py +++ b/src/firebolt/db/cursor.py @@ -339,6 +339,34 @@ def __init__( assert isinstance(client, ClientV2) # Type check super().__init__(*args, client=client, connection=connection, **kwargs) + @check_not_closed + def execute_async( + self, + query: str, + parameters: Optional[Sequence[ParameterType]] = None, + skip_parsing: bool = False, + ) -> int: + self._reset() + # Allow users to manually skip parsing for performance improvement. + queries: List[Union[SetParameter, str]] = ( + [query] if skip_parsing else split_format_sql(query, parameters) + ) + try: + for query in queries: + resp = self._api_request( + query, + {"output_format": JSON_OUTPUT_FORMAT, "async": True}, + ) + self._raise_if_error(resp) + self._parse_async_response(resp) + + self._state = CursorState.DONE + except Exception: + self._state = CursorState.ERROR + raise + + return -1 + def is_db_available(self, database_name: str) -> bool: """ Verify that the database exists. diff --git a/src/firebolt/utils/exception.py b/src/firebolt/utils/exception.py index c03cb344ab5..457ee8ec16c 100644 --- a/src/firebolt/utils/exception.py +++ b/src/firebolt/utils/exception.py @@ -321,3 +321,17 @@ class QueryTimeoutError(FireboltError, TimeoutError): def __init__(self, message="Query execution timed out."): # type: ignore super().__init__(message) + + +class MethodNotAllowedInAsyncError(FireboltError): + """Method not allowed. + + Exception raised when the method is not allowed. + """ + + def __init__(self, method_name: str): + super().__init__( + f"Method {method_name} not allowed for an async query." + " Please get the token and use the async query API to get the status." + ) + self.method_name = method_name From fbd5cc0e83adfc4037e84193150aab448d046b9d Mon Sep 17 00:00:00 2001 From: ptiurin Date: Thu, 23 Jan 2025 10:45:48 +0000 Subject: [PATCH 02/19] Mypy --- src/firebolt/async_db/connection.py | 16 ++++++++++------ src/firebolt/async_db/cursor.py | 20 ++++++++++++-------- src/firebolt/common/base_connection.py | 6 ++++-- src/firebolt/db/connection.py | 18 ++++++------------ src/firebolt/db/cursor.py | 20 ++++++++++++-------- src/firebolt/utils/util.py | 15 +++++++++++++++ 6 files changed, 59 insertions(+), 36 deletions(-) diff --git a/src/firebolt/async_db/connection.py b/src/firebolt/async_db/connection.py index da0f79753eb..b36441f5ce7 100644 --- a/src/firebolt/async_db/connection.py +++ b/src/firebolt/async_db/connection.py @@ -24,7 +24,11 @@ FireboltError, ) from firebolt.utils.usage_tracker import get_user_agent_header -from firebolt.utils.util import fix_url_schema, validate_engine_name_and_url_v1 +from firebolt.utils.util import ( + ensure_v2, + fix_url_schema, + validate_engine_name_and_url_v1, +) class Connection(BaseConnection): @@ -72,10 +76,9 @@ def __init__( api_endpoint: str, init_parameters: Optional[Dict[str, Any]] = None, ): - super().__init__() + super().__init__(cursor_type) self.api_endpoint = api_endpoint self.engine_url = engine_url - self.cursor_type = cursor_type self._cursors: List[Cursor] = [] self._client = client self.init_parameters = init_parameters or {} @@ -91,15 +94,16 @@ def cursor(self, **kwargs: Any) -> Cursor: return c # Server-side async methods + @ensure_v2 async def _get_async_query_status(self, token: str) -> str: cursor = self.cursor() await cursor.execute(ASYNC_QUERY_STATUS_REQUEST.format(token=token)) - if cursor.rowcount != 1: - raise FireboltError("Unexpected result from async query status request.") result = await cursor.fetchone() + if cursor.rowcount != 1 or not result: + raise FireboltError("Unexpected result from async query status request.") columns = cursor.description result_dict = dict(zip([column.name for column in columns], result)) - return result_dict["status"] + return str(result_dict.get("status")) async def is_async_query_running(self, token: str) -> bool: status = await self._get_async_query_status(token) diff --git a/src/firebolt/async_db/cursor.py b/src/firebolt/async_db/cursor.py index 83c0b0535df..513bd60b067 100644 --- a/src/firebolt/async_db/cursor.py +++ b/src/firebolt/async_db/cursor.py @@ -401,16 +401,20 @@ async def execute_async( ) -> int: self._reset() # Allow users to manually skip parsing for performance improvement. + params_list = [parameters] if parameters else [] queries: List[Union[SetParameter, str]] = ( - [query] if skip_parsing else split_format_sql(query, parameters) + [query] if skip_parsing else split_format_sql(query, params_list) ) - for query in queries: - resp = await self._api_request( - query, - {"output_format": JSON_OUTPUT_FORMAT, "async": True}, - ) - await self._raise_if_error(resp) - self._parse_async_response(resp) + for a_query in queries: + if isinstance(a_query, SetParameter): + await self._validate_set_parameter(a_query, None) + else: + resp = await self._api_request( + a_query, + {"output_format": JSON_OUTPUT_FORMAT, "async": True}, + ) + await self._raise_if_error(resp) + self._parse_async_response(resp) return -1 async def is_db_available(self, database_name: str) -> bool: diff --git a/src/firebolt/common/base_connection.py b/src/firebolt/common/base_connection.py index 4e290f24a97..af005005f8a 100644 --- a/src/firebolt/common/base_connection.py +++ b/src/firebolt/common/base_connection.py @@ -1,5 +1,6 @@ -from typing import Any, List +from typing import Any, List, Type +from firebolt.common.base_cursor import BaseCursor from firebolt.utils.exception import ConnectionClosedError ASYNC_QUERY_STATUS_RUNNING = "RUNNING" @@ -8,7 +9,8 @@ class BaseConnection: - def __init__(self) -> None: + def __init__(self, cursor_type: Type[BaseCursor]) -> None: + self.cursor_type = cursor_type self._cursors: List[Any] = [] self._is_closed = False diff --git a/src/firebolt/db/connection.py b/src/firebolt/db/connection.py index 96842bd42e2..792ba3d21c5 100644 --- a/src/firebolt/db/connection.py +++ b/src/firebolt/db/connection.py @@ -25,7 +25,11 @@ FireboltError, ) from firebolt.utils.usage_tracker import get_user_agent_header -from firebolt.utils.util import fix_url_schema, validate_engine_name_and_url_v1 +from firebolt.utils.util import ( + ensure_v2, + fix_url_schema, + validate_engine_name_and_url_v1, +) logger = logging.getLogger(__name__) @@ -152,15 +156,6 @@ def connect_v2( ) -def ensure_v2(func): - def wrapper(self, *args, **kwargs): - if self.cursor_type != CursorV2: - raise FireboltError("This method is only supported for CursorV2.") - return func(self, *args, **kwargs) - - return wrapper - - class Connection(BaseConnection): """ Firebolt database connection class. Implements PEP-249. @@ -199,10 +194,9 @@ def __init__( api_endpoint: str = DEFAULT_API_URL, init_parameters: Optional[Dict[str, Any]] = None, ): - super().__init__() + super().__init__(cursor_type) self.api_endpoint = api_endpoint self.engine_url = engine_url - self.cursor_type = cursor_type self._cursors: List[Cursor] = [] self._client = client self.init_parameters = init_parameters or {} diff --git a/src/firebolt/db/cursor.py b/src/firebolt/db/cursor.py index a48ecd6dc6f..dd90c385852 100644 --- a/src/firebolt/db/cursor.py +++ b/src/firebolt/db/cursor.py @@ -348,17 +348,21 @@ def execute_async( ) -> int: self._reset() # Allow users to manually skip parsing for performance improvement. + params_list = [parameters] if parameters else [] queries: List[Union[SetParameter, str]] = ( - [query] if skip_parsing else split_format_sql(query, parameters) + [query] if skip_parsing else split_format_sql(query, params_list) ) try: - for query in queries: - resp = self._api_request( - query, - {"output_format": JSON_OUTPUT_FORMAT, "async": True}, - ) - self._raise_if_error(resp) - self._parse_async_response(resp) + for a_query in queries: + if isinstance(a_query, SetParameter): + self._validate_set_parameter(a_query, None) + else: + resp = self._api_request( + a_query, + {"output_format": JSON_OUTPUT_FORMAT, "async": True}, + ) + self._raise_if_error(resp) + self._parse_async_response(resp) self._state = CursorState.DONE except Exception: diff --git a/src/firebolt/utils/util.py b/src/firebolt/utils/util.py index d493951006f..2228ae75431 100644 --- a/src/firebolt/utils/util.py +++ b/src/firebolt/utils/util.py @@ -5,6 +5,7 @@ from types import TracebackType from typing import ( TYPE_CHECKING, + Any, Callable, Dict, Optional, @@ -16,8 +17,11 @@ from httpx import URL, Response, codes +from firebolt.async_db.cursor import CursorV2 +from firebolt.common.base_connection import BaseConnection from firebolt.utils.exception import ( ConfigurationError, + FireboltError, FireboltStructuredError, ) @@ -237,3 +241,14 @@ def parse_url_and_params(url: str) -> Tuple[str, Dict[str, str]]: raise ValueError(f"Multiple values found for key '{key}'") query_params_dict[key] = values[0] return result_url, query_params_dict + + +def ensure_v2(func: Callable) -> Callable: + """Decorator to ensure that the method is only called for CursorV2.""" + + def wrapper(self: BaseConnection, *args: Any, **kwargs: Any) -> Any: + if self.cursor_type != CursorV2: + raise FireboltError("This method is only supported for CursorV2.") + return func(self, *args, **kwargs) + + return wrapper From a47036e52bd734ca91b1dc7af34ce795b7a4d87f Mon Sep 17 00:00:00 2001 From: ptiurin Date: Thu, 23 Jan 2025 11:32:00 +0000 Subject: [PATCH 03/19] fix circular import --- src/firebolt/async_db/connection.py | 7 ++----- src/firebolt/common/base_connection.py | 20 ++++++++++++++++---- src/firebolt/db/connection.py | 7 ++----- src/firebolt/utils/util.py | 15 --------------- 4 files changed, 20 insertions(+), 29 deletions(-) diff --git a/src/firebolt/async_db/connection.py b/src/firebolt/async_db/connection.py index b36441f5ce7..6cc78092057 100644 --- a/src/firebolt/async_db/connection.py +++ b/src/firebolt/async_db/connection.py @@ -15,6 +15,7 @@ ASYNC_QUERY_STATUS_RUNNING, ASYNC_QUERY_STATUS_SUCCESSFUL, BaseConnection, + ensure_v2, ) from firebolt.common.cache import _firebolt_system_engine_cache from firebolt.common.constants import DEFAULT_TIMEOUT_SECONDS @@ -24,11 +25,7 @@ FireboltError, ) from firebolt.utils.usage_tracker import get_user_agent_header -from firebolt.utils.util import ( - ensure_v2, - fix_url_schema, - validate_engine_name_and_url_v1, -) +from firebolt.utils.util import fix_url_schema, validate_engine_name_and_url_v1 class Connection(BaseConnection): diff --git a/src/firebolt/common/base_connection.py b/src/firebolt/common/base_connection.py index af005005f8a..4decb361f9c 100644 --- a/src/firebolt/common/base_connection.py +++ b/src/firebolt/common/base_connection.py @@ -1,15 +1,27 @@ -from typing import Any, List, Type +from typing import Any, Callable, List, Type -from firebolt.common.base_cursor import BaseCursor -from firebolt.utils.exception import ConnectionClosedError +# from firebolt.common.base_cursor import BaseCursor +from firebolt.async_db.cursor import CursorV2 +from firebolt.utils.exception import ConnectionClosedError, FireboltError ASYNC_QUERY_STATUS_RUNNING = "RUNNING" ASYNC_QUERY_STATUS_SUCCESSFUL = "ENDED_SUCCESSFULLY" ASYNC_QUERY_STATUS_REQUEST = "CALL fb_GetAsyncStatus('{token}')" +def ensure_v2(func: Callable) -> Callable: + """Decorator to ensure that the method is only called for CursorV2.""" + + def wrapper(self: BaseConnection, *args: Any, **kwargs: Any) -> Any: + if self.cursor_type != CursorV2: + raise FireboltError("This method is only supported for CursorV2.") + return func(self, *args, **kwargs) + + return wrapper + + class BaseConnection: - def __init__(self, cursor_type: Type[BaseCursor]) -> None: + def __init__(self, cursor_type: Type) -> None: self.cursor_type = cursor_type self._cursors: List[Any] = [] self._is_closed = False diff --git a/src/firebolt/db/connection.py b/src/firebolt/db/connection.py index 792ba3d21c5..6ded180d57e 100644 --- a/src/firebolt/db/connection.py +++ b/src/firebolt/db/connection.py @@ -14,6 +14,7 @@ ASYNC_QUERY_STATUS_RUNNING, ASYNC_QUERY_STATUS_SUCCESSFUL, BaseConnection, + ensure_v2, ) from firebolt.common.cache import _firebolt_system_engine_cache from firebolt.common.constants import DEFAULT_TIMEOUT_SECONDS @@ -25,11 +26,7 @@ FireboltError, ) from firebolt.utils.usage_tracker import get_user_agent_header -from firebolt.utils.util import ( - ensure_v2, - fix_url_schema, - validate_engine_name_and_url_v1, -) +from firebolt.utils.util import fix_url_schema, validate_engine_name_and_url_v1 logger = logging.getLogger(__name__) diff --git a/src/firebolt/utils/util.py b/src/firebolt/utils/util.py index 2228ae75431..d493951006f 100644 --- a/src/firebolt/utils/util.py +++ b/src/firebolt/utils/util.py @@ -5,7 +5,6 @@ from types import TracebackType from typing import ( TYPE_CHECKING, - Any, Callable, Dict, Optional, @@ -17,11 +16,8 @@ from httpx import URL, Response, codes -from firebolt.async_db.cursor import CursorV2 -from firebolt.common.base_connection import BaseConnection from firebolt.utils.exception import ( ConfigurationError, - FireboltError, FireboltStructuredError, ) @@ -241,14 +237,3 @@ def parse_url_and_params(url: str) -> Tuple[str, Dict[str, str]]: raise ValueError(f"Multiple values found for key '{key}'") query_params_dict[key] = values[0] return result_url, query_params_dict - - -def ensure_v2(func: Callable) -> Callable: - """Decorator to ensure that the method is only called for CursorV2.""" - - def wrapper(self: BaseConnection, *args: Any, **kwargs: Any) -> Any: - if self.cursor_type != CursorV2: - raise FireboltError("This method is only supported for CursorV2.") - return func(self, *args, **kwargs) - - return wrapper From f78469581de25e1267f7ff63cdc9d097eb65affb Mon Sep 17 00:00:00 2001 From: ptiurin Date: Thu, 23 Jan 2025 11:50:40 +0000 Subject: [PATCH 04/19] better fix --- src/firebolt/async_db/connection.py | 6 ++++-- src/firebolt/common/base_connection.py | 17 ++--------------- src/firebolt/db/connection.py | 6 ++++-- 3 files changed, 10 insertions(+), 19 deletions(-) diff --git a/src/firebolt/async_db/connection.py b/src/firebolt/async_db/connection.py index 6cc78092057..468c7c5b165 100644 --- a/src/firebolt/async_db/connection.py +++ b/src/firebolt/async_db/connection.py @@ -15,7 +15,6 @@ ASYNC_QUERY_STATUS_RUNNING, ASYNC_QUERY_STATUS_SUCCESSFUL, BaseConnection, - ensure_v2, ) from firebolt.common.cache import _firebolt_system_engine_cache from firebolt.common.constants import DEFAULT_TIMEOUT_SECONDS @@ -91,8 +90,11 @@ def cursor(self, **kwargs: Any) -> Cursor: return c # Server-side async methods - @ensure_v2 async def _get_async_query_status(self, token: str) -> str: + if self.cursor_type != CursorV2: + raise FireboltError( + "This method is only supported for connection with service account." + ) cursor = self.cursor() await cursor.execute(ASYNC_QUERY_STATUS_REQUEST.format(token=token)) result = await cursor.fetchone() diff --git a/src/firebolt/common/base_connection.py b/src/firebolt/common/base_connection.py index 4decb361f9c..80be32d4898 100644 --- a/src/firebolt/common/base_connection.py +++ b/src/firebolt/common/base_connection.py @@ -1,25 +1,12 @@ -from typing import Any, Callable, List, Type +from typing import Any, List, Type -# from firebolt.common.base_cursor import BaseCursor -from firebolt.async_db.cursor import CursorV2 -from firebolt.utils.exception import ConnectionClosedError, FireboltError +from firebolt.utils.exception import ConnectionClosedError ASYNC_QUERY_STATUS_RUNNING = "RUNNING" ASYNC_QUERY_STATUS_SUCCESSFUL = "ENDED_SUCCESSFULLY" ASYNC_QUERY_STATUS_REQUEST = "CALL fb_GetAsyncStatus('{token}')" -def ensure_v2(func: Callable) -> Callable: - """Decorator to ensure that the method is only called for CursorV2.""" - - def wrapper(self: BaseConnection, *args: Any, **kwargs: Any) -> Any: - if self.cursor_type != CursorV2: - raise FireboltError("This method is only supported for CursorV2.") - return func(self, *args, **kwargs) - - return wrapper - - class BaseConnection: def __init__(self, cursor_type: Type) -> None: self.cursor_type = cursor_type diff --git a/src/firebolt/db/connection.py b/src/firebolt/db/connection.py index 6ded180d57e..71aeed836db 100644 --- a/src/firebolt/db/connection.py +++ b/src/firebolt/db/connection.py @@ -14,7 +14,6 @@ ASYNC_QUERY_STATUS_RUNNING, ASYNC_QUERY_STATUS_SUCCESSFUL, BaseConnection, - ensure_v2, ) from firebolt.common.cache import _firebolt_system_engine_cache from firebolt.common.constants import DEFAULT_TIMEOUT_SECONDS @@ -228,8 +227,11 @@ def close(self) -> None: self._is_closed = True # Server-side async methods - @ensure_v2 def _get_async_query_status(self, token: str) -> str: + if self.cursor_type != CursorV2: + raise FireboltError( + "This method is only supported for connection with service account." + ) cursor = self.cursor() cursor.execute(ASYNC_QUERY_STATUS_REQUEST.format(token=token)) if cursor.rowcount != 1: From 3b591d97f8e18fdd61a64341a3cecd92d4c6fef2 Mon Sep 17 00:00:00 2001 From: ptiurin Date: Thu, 23 Jan 2025 12:15:30 +0000 Subject: [PATCH 05/19] integration tests --- tests/integration/dbapi/sync/V2/conftest.py | 23 +++++++- .../dbapi/sync/V2/test_server_async.py | 55 +++++++++++++++++++ 2 files changed, 77 insertions(+), 1 deletion(-) create mode 100644 tests/integration/dbapi/sync/V2/test_server_async.py diff --git a/tests/integration/dbapi/sync/V2/conftest.py b/tests/integration/dbapi/sync/V2/conftest.py index 96a3c19eb5c..a116657d4f1 100644 --- a/tests/integration/dbapi/sync/V2/conftest.py +++ b/tests/integration/dbapi/sync/V2/conftest.py @@ -1,6 +1,6 @@ import random import string -from typing import Tuple +from typing import Any, Callable, Tuple from pytest import fixture @@ -28,6 +28,27 @@ def connection( yield connection +@fixture +def connection_factory( + engine_name: str, + database_name: str, + auth: Auth, + account_name: str, + api_endpoint: str, +) -> Callable[..., Connection]: + def factory(**kwargs: Any) -> Connection: + return connect( + engine_name=engine_name, + database=database_name, + auth=auth, + account_name=account_name, + api_endpoint=api_endpoint, + **kwargs, + ) + + return factory + + @fixture def connection_no_db( engine_name: str, diff --git a/tests/integration/dbapi/sync/V2/test_server_async.py b/tests/integration/dbapi/sync/V2/test_server_async.py new file mode 100644 index 00000000000..a92bb30b5db --- /dev/null +++ b/tests/integration/dbapi/sync/V2/test_server_async.py @@ -0,0 +1,55 @@ +import time +from random import randint +from typing import Callable + +from firebolt.db import Connection + + +def test_insert_async(connection: Connection) -> None: + cursor = connection.cursor() + rnd_suffix = str(randint(0, 1000)) + table_name = f"test_insert_async_{rnd_suffix}" + try: + cursor.execute(f"CREATE TABLE {table_name} (id INT, name TEXT)") + cursor.execute_async(f"INSERT INTO {table_name} (id, name) VALUES (1, 'test')") + token = cursor.async_query_token + assert token is not None, "Asyc token was not returned" + # sleep for 2 sec to make sure the async query is completed + time.sleep(2) + assert connection.is_async_query_running(token) == False + assert connection.is_async_query_successful(token) == True + # Verify the result + cursor = connection.cursor() + cursor.execute(f"SELECT * FROM {table_name}") + result = cursor.fetchall() + assert result == [[1, "test"]] + finally: + cursor.execute(f"DROP TABLE {table_name}") + + +def test_check_async_execution_from_another_connection( + connection_factory: Callable[..., Connection] +) -> None: + connection_1 = connection_factory() + connection_2 = connection_factory() + cursor = connection_1.cursor() + rnd_suffix = str(randint(0, 1000)) + table_name = f"test_insert_async_{rnd_suffix}" + try: + cursor.execute(f"CREATE TABLE {table_name} (id INT, name TEXT)") + cursor.execute_async(f"INSERT INTO {table_name} (id, name) VALUES (1, 'test')") + token = cursor.async_query_token + assert token is not None, "Asyc token was not returned" + # sleep for 2 sec to make sure the async query is completed + time.sleep(2) + assert connection_2.is_async_query_running(token) == False + assert connection_2.is_async_query_successful(token) == True + # Verify the result + cursor = connection_2.cursor() + cursor.execute(f"SELECT * FROM {table_name}") + result = cursor.fetchall() + assert result == [[1, "test"]] + finally: + cursor.execute(f"DROP TABLE {table_name}") + connection_1.close() + connection_2.close() From 595ae9dd4f0416fcd98a718636eebc333b96f3c9 Mon Sep 17 00:00:00 2001 From: ptiurin Date: Thu, 23 Jan 2025 15:10:56 +0000 Subject: [PATCH 06/19] connection tests --- src/firebolt/db/connection.py | 4 +- tests/unit/async_db/test_connection.py | 88 +++++++++++++++++++++++++- tests/unit/db/test_connection.py | 88 +++++++++++++++++++++++++- tests/unit/db_conftest.py | 87 ++++++++++++++++++++++++- 4 files changed, 262 insertions(+), 5 deletions(-) diff --git a/src/firebolt/db/connection.py b/src/firebolt/db/connection.py index 71aeed836db..1ea1cfa2bab 100644 --- a/src/firebolt/db/connection.py +++ b/src/firebolt/db/connection.py @@ -234,9 +234,9 @@ def _get_async_query_status(self, token: str) -> str: ) cursor = self.cursor() cursor.execute(ASYNC_QUERY_STATUS_REQUEST.format(token=token)) - if cursor.rowcount != 1: - raise FireboltError("Unexpected result from async query status request.") result = cursor.fetchone() + if cursor.rowcount != 1 or not result: + raise FireboltError("Unexpected result from async query status request.") columns = cursor.description result_dict = dict(zip([column.name for column in columns], result)) return result_dict["status"] diff --git a/tests/unit/async_db/test_connection.py b/tests/unit/async_db/test_connection.py index 183eb67621c..07e81971e36 100644 --- a/tests/unit/async_db/test_connection.py +++ b/tests/unit/async_db/test_connection.py @@ -1,4 +1,4 @@ -from typing import Callable, List +from typing import Callable, List, Tuple from unittest.mock import patch from pyfakefs.fake_filesystem_unittest import Patcher @@ -406,3 +406,89 @@ async def test_connect_no_user_agent( ) as connection: await connection.cursor().execute("select*") ut.assert_called_with([], []) + + +@mark.parametrize( + "server_status,expected_running,expected_success", + [ + ("RUNNING", True, False), + ("ENDED_SUCCESSFULLY", False, True), + ("FAILED", False, False), + ("CANCELLED", False, False), + ], +) +async def test_is_async_query_running_success( + db_name: str, + account_name: str, + engine_name: str, + auth: Auth, + api_endpoint: str, + httpx_mock: HTTPXMock, + query_url: str, + async_query_callback_factory: Callable, + async_query_data: List[List[ColType]], + async_query_meta: List[Tuple[str, str]], + mock_connection_flow: Callable, + server_status: str, + expected_running: bool, + expected_success: bool, +): + """Test is_async_query_running method""" + mock_connection_flow() + async_query_data[0][5] = server_status + async_query_status_running_callback = async_query_callback_factory( + async_query_data, async_query_meta + ) + + httpx_mock.add_callback( + async_query_status_running_callback, + url=query_url, + match_content="CALL fb_GetAsyncStatus('token')".encode("utf-8"), + ) + + async with await connect( + database=db_name, + auth=auth, + engine_name=engine_name, + account_name=account_name, + api_endpoint=api_endpoint, + ) as connection: + assert await connection.is_async_query_running("token") is expected_running + assert await connection.is_async_query_successful("token") is expected_success + + +async def test_async_query_status_unexpected_result( + db_name: str, + account_name: str, + engine_name: str, + auth: Auth, + api_endpoint: str, + httpx_mock: HTTPXMock, + query_url: str, + async_query_callback_factory: Callable, + async_query_meta: List[Tuple[str, str]], + mock_connection_flow: Callable, +): + """Test is_async_query_running method""" + mock_connection_flow() + async_query_status_running_callback = async_query_callback_factory( + [], async_query_meta + ) + + httpx_mock.add_callback( + async_query_status_running_callback, + url=query_url, + match_content="CALL fb_GetAsyncStatus('token')".encode("utf-8"), + ) + + async with await connect( + database=db_name, + auth=auth, + engine_name=engine_name, + account_name=account_name, + api_endpoint=api_endpoint, + ) as connection: + with raises(FireboltError): + await connection.is_async_query_running("token") + with raises(FireboltError): + await connection.is_async_query_successful("token") diff --git a/tests/unit/db/test_connection.py b/tests/unit/db/test_connection.py index d874079aff8..f36694e8829 100644 --- a/tests/unit/db/test_connection.py +++ b/tests/unit/db/test_connection.py @@ -1,6 +1,6 @@ import gc import warnings -from typing import Callable, List +from typing import Callable, List, Tuple from unittest.mock import patch from pyfakefs.fake_filesystem_unittest import Patcher @@ -424,3 +424,89 @@ def test_connect_no_user_agent( ) as connection: connection.cursor().execute("select*") ut.assert_called_with([], []) + + +@mark.parametrize( + "server_status,expected_running,expected_success", + [ + ("RUNNING", True, False), + ("ENDED_SUCCESSFULLY", False, True), + ("FAILED", False, False), + ("CANCELLED", False, False), + ], +) +def test_is_async_query_running_success( + db_name: str, + account_name: str, + engine_name: str, + auth: Auth, + api_endpoint: str, + httpx_mock: HTTPXMock, + query_url: str, + async_query_callback_factory: Callable, + async_query_data: List[List[ColType]], + async_query_meta: List[Tuple[str, str]], + mock_connection_flow: Callable, + server_status: str, + expected_running: bool, + expected_success: bool, +): + """Test is_async_query_running method""" + mock_connection_flow() + async_query_data[0][5] = server_status + async_query_status_running_callback = async_query_callback_factory( + async_query_data, async_query_meta + ) + + httpx_mock.add_callback( + async_query_status_running_callback, + url=query_url, + match_content="CALL fb_GetAsyncStatus('token')".encode("utf-8"), + ) + + with connect( + database=db_name, + auth=auth, + engine_name=engine_name, + account_name=account_name, + api_endpoint=api_endpoint, + ) as connection: + assert connection.is_async_query_running("token") is expected_running + assert connection.is_async_query_successful("token") is expected_success + + +def test_async_query_status_unexpected_result( + db_name: str, + account_name: str, + engine_name: str, + auth: Auth, + api_endpoint: str, + httpx_mock: HTTPXMock, + query_url: str, + async_query_callback_factory: Callable, + async_query_meta: List[Tuple[str, str]], + mock_connection_flow: Callable, +): + """Test is_async_query_running method""" + mock_connection_flow() + async_query_status_running_callback = async_query_callback_factory( + [], async_query_meta + ) + + httpx_mock.add_callback( + async_query_status_running_callback, + url=query_url, + match_content="CALL fb_GetAsyncStatus('token')".encode("utf-8"), + ) + + with connect( + database=db_name, + auth=auth, + engine_name=engine_name, + account_name=account_name, + api_endpoint=api_endpoint, + ) as connection: + with raises(FireboltError): + connection.is_async_query_running("token") + with raises(FireboltError): + connection.is_async_query_successful("token") diff --git a/tests/unit/db_conftest.py b/tests/unit/db_conftest.py index f1e20b9c495..a7cf01bd577 100644 --- a/tests/unit/db_conftest.py +++ b/tests/unit/db_conftest.py @@ -1,7 +1,7 @@ from datetime import date, datetime from decimal import Decimal from json import dumps as jdumps -from typing import Any, Callable, Dict, List +from typing import Any, Callable, Dict, List, Tuple from httpx import URL, Request, codes from pytest import fixture @@ -523,3 +523,88 @@ def types_map() -> Dict[str, type]: **struct, **nested_struct, } + + +@fixture +def async_query_data() -> List[List[ColType]]: + query_data = [ + [ + "developer", + "ecosystem_ci", + "2025-01-23 14:08:06.087953+00", + "2025-01-23 14:08:06.134208+00", + "2025-01-23 14:08:06.410542+00", + "ENDED_SUCCESSFULLY", + "db4c7542-3058-4e2a-9d49-ae5ea2da3cbe", + "f9520387-224c-48e9-9858-b2d05518ce94", + "", + "2", + "2", + "0", + ] + ] + return query_data + + +@fixture +def async_query_meta() -> List[Tuple[str, str]]: + query_meta = [ + ("account_name", "text null"), + ("user_name", "text null"), + ("submitted_time", "timestamptz null"), + ("start_time", "timestamptz null"), + ("end_time", "timestamptz null"), + ("status", "text null"), + ("request_id", "text null"), + ("query_id", "text null"), + ("error_message", "text null"), + ("scanned_bytes", "long null"), + ("scanned_rows", "long null"), + ("retries", "long null"), + ] + return query_meta + + +@fixture +def async_query_callback_factory( + query_statistics: Dict[str, Any], +) -> Callable: + def create_callback( + data: List[List[ColType]], meta: List[Tuple[str, str]] + ) -> Callable: + def do_query(request: Request, **kwargs) -> Response: + assert request.read() != b"" + assert request.method == "POST" + assert f"output_format={JSON_OUTPUT_FORMAT}" in str(request.url) + query_response = { + "meta": [{"name": c[0], "type": c[1]} for c in meta], + "data": data, + "rows": len(data), + "statistics": query_statistics, + } + return Response(status_code=codes.OK, json=query_response) + + return do_query + + return create_callback + + +@fixture +def async_query_status_running_callback( + query_statistics: Dict[str, Any], + query_data: List[List[ColType]], + query_meta: List[Tuple[str, str]], +) -> Callable: + def do_query(request: Request, **kwargs) -> Response: + assert request.read() != b"" + assert request.method == "POST" + assert f"output_format={JSON_OUTPUT_FORMAT}" in str(request.url) + query_response = { + "meta": [{"name": c[0], "type": c[1]} for c in query_meta], + "data": query_data, + "rows": len(query_data), + "statistics": query_statistics, + } + return Response(status_code=codes.OK, json=query_response) + + return do_query From 84d7d607e04917181723c52dc93ca63eab0bdc50 Mon Sep 17 00:00:00 2001 From: ptiurin Date: Thu, 23 Jan 2025 17:40:53 +0000 Subject: [PATCH 07/19] cursor tests --- src/firebolt/async_db/cursor.py | 18 ++++++- src/firebolt/db/cursor.py | 28 ++++++---- tests/unit/async_db/test_cursor.py | 82 ++++++++++++++++++++++++++++++ tests/unit/db/test_cursor.py | 82 ++++++++++++++++++++++++++++++ tests/unit/db_conftest.py | 34 +++++++++++++ 5 files changed, 232 insertions(+), 12 deletions(-) diff --git a/src/firebolt/async_db/cursor.py b/src/firebolt/async_db/cursor.py index 513bd60b067..aeecab29f39 100644 --- a/src/firebolt/async_db/cursor.py +++ b/src/firebolt/async_db/cursor.py @@ -49,6 +49,7 @@ from firebolt.utils.exception import ( EngineNotRunningError, FireboltDatabaseError, + FireboltError, OperationalError, ProgrammingError, QueryTimeoutError, @@ -405,9 +406,17 @@ async def execute_async( queries: List[Union[SetParameter, str]] = ( [query] if skip_parsing else split_format_sql(query, params_list) ) - for a_query in queries: + if len(queries) > 1: + raise FireboltError( + "execute_async does not support multi-statement queries" + ) + a_query = queries[0] + try: if isinstance(a_query, SetParameter): - await self._validate_set_parameter(a_query, None) + raise FireboltError( + "execute_async does not support set statements, " + "please use execute to set this parameter" + ) else: resp = await self._api_request( a_query, @@ -415,6 +424,11 @@ async def execute_async( ) await self._raise_if_error(resp) self._parse_async_response(resp) + self._state = CursorState.DONE + except Exception: + self._state = CursorState.ERROR + raise + return -1 async def is_db_available(self, database_name: str) -> bool: diff --git a/src/firebolt/db/cursor.py b/src/firebolt/db/cursor.py index dd90c385852..bdfd5670d17 100644 --- a/src/firebolt/db/cursor.py +++ b/src/firebolt/db/cursor.py @@ -47,6 +47,7 @@ from firebolt.utils.exception import ( EngineNotRunningError, FireboltDatabaseError, + FireboltError, OperationalError, ProgrammingError, QueryTimeoutError, @@ -352,17 +353,24 @@ def execute_async( queries: List[Union[SetParameter, str]] = ( [query] if skip_parsing else split_format_sql(query, params_list) ) + if len(queries) > 1: + raise FireboltError( + "execute_async does not support multi-statement queries" + ) + a_query = queries[0] try: - for a_query in queries: - if isinstance(a_query, SetParameter): - self._validate_set_parameter(a_query, None) - else: - resp = self._api_request( - a_query, - {"output_format": JSON_OUTPUT_FORMAT, "async": True}, - ) - self._raise_if_error(resp) - self._parse_async_response(resp) + if isinstance(a_query, SetParameter): + raise FireboltError( + "execute_async does not support set statements, " + "please use execute to set this parameter" + ) + else: + resp = self._api_request( + a_query, + {"output_format": JSON_OUTPUT_FORMAT, "async": True}, + ) + self._raise_if_error(resp) + self._parse_async_response(resp) self._state = CursorState.DONE except Exception: diff --git a/tests/unit/async_db/test_cursor.py b/tests/unit/async_db/test_cursor.py index 14ced6794d5..3adbafea2c5 100644 --- a/tests/unit/async_db/test_cursor.py +++ b/tests/unit/async_db/test_cursor.py @@ -13,6 +13,8 @@ ConfigurationError, CursorClosedError, DataError, + FireboltError, + MethodNotAllowedInAsyncError, OperationalError, ProgrammingError, QueryNotRunError, @@ -772,3 +774,83 @@ def long_query_callback(request: Request, **kwargs) -> Response: assert fast_executed is False, "fast query was not executed" httpx_mock.reset(False) + + +async def verify_async_fetch_not_allowed(cursor: Cursor): + with raises(MethodNotAllowedInAsyncError): + await cursor.fetchall() + with raises(MethodNotAllowedInAsyncError): + await cursor.fetchone() + with raises(MethodNotAllowedInAsyncError): + await cursor.fetchmany() + + +async def test_cursor_execute_async( + httpx_mock: HTTPXMock, + async_query_callback: Callable, + async_query_url: str, + cursor: Cursor, + async_token: str, +): + httpx_mock.add_callback(async_query_callback, url=async_query_url) + await cursor.execute_async("SELECT 2") + await verify_async_fetch_not_allowed(cursor) + assert cursor.async_query_token == async_token + assert cursor._state == CursorState.DONE + + +async def test_cursor_execute_async_multiple_queries( + cursor: Cursor, +): + with raises(FireboltError) as e: + await cursor.execute_async("SELECT 2; SELECT 3") + assert "does not support multi-statement" in str(e.value) + + +async def test_cursor_execute_async_parametrised_query( + httpx_mock: HTTPXMock, + async_query_callback: Callable, + async_query_url: str, + cursor: Cursor, + async_token: str, +): + httpx_mock.add_callback(async_query_callback, url=async_query_url) + await cursor.execute_async("SELECT 2 WHERE x = ?", [1]) + await verify_async_fetch_not_allowed(cursor) + assert cursor.async_query_token == async_token + assert cursor._state == CursorState.DONE + + +async def test_cursor_execute_async_skip_parsing( + httpx_mock: HTTPXMock, + async_query_callback: Callable, + async_query_url: str, + cursor: Cursor, + async_token: str, +): + httpx_mock.add_callback(async_query_callback, url=async_query_url) + await cursor.execute_async("SELECT 2; SELECT 3", skip_parsing=True) + await verify_async_fetch_not_allowed(cursor) + assert cursor.async_query_token == async_token + assert cursor._state == CursorState.DONE + + +async def test_cursor_execute_async_validate_set_parameters( + cursor: Cursor, +): + with raises(FireboltError) as e: + await cursor.execute_async("SET a = b") + assert "does not support set" in str(e.value) + + +async def test_cursor_execute_async_respects_api_errors( + httpx_mock: HTTPXMock, + async_query_url: str, + cursor: Cursor, +): + httpx_mock.add_callback( + lambda *args, **kwargs: Response(status_code=codes.BAD_REQUEST), + url=async_query_url, + ) + with raises(HTTPStatusError): + await cursor.execute_async("SELECT 2") diff --git a/tests/unit/db/test_cursor.py b/tests/unit/db/test_cursor.py index 4f4a5c60ea6..a7a9b2333d2 100644 --- a/tests/unit/db/test_cursor.py +++ b/tests/unit/db/test_cursor.py @@ -13,6 +13,8 @@ ConfigurationError, CursorClosedError, DataError, + FireboltError, + MethodNotAllowedInAsyncError, OperationalError, ProgrammingError, QueryNotRunError, @@ -757,3 +759,83 @@ def long_query_callback(request: Request, **kwargs) -> Response: assert fast_executed is False, "fast query was not executed" httpx_mock.reset(False) + + +def verify_async_fetch_not_allowed(cursor: Cursor): + with raises(MethodNotAllowedInAsyncError): + cursor.fetchall() + with raises(MethodNotAllowedInAsyncError): + cursor.fetchone() + with raises(MethodNotAllowedInAsyncError): + cursor.fetchmany() + + +def test_cursor_execute_async( + httpx_mock: HTTPXMock, + async_query_callback: Callable, + async_query_url: str, + cursor: Cursor, + async_token: str, +): + httpx_mock.add_callback(async_query_callback, url=async_query_url) + cursor.execute_async("SELECT 2") + verify_async_fetch_not_allowed(cursor) + assert cursor.async_query_token == async_token + assert cursor._state == CursorState.DONE + + +def test_cursor_execute_async_multiple_queries( + cursor: Cursor, +): + with raises(FireboltError) as e: + cursor.execute_async("SELECT 2; SELECT 3") + assert "does not support multi-statement" in str(e.value) + + +def test_cursor_execute_async_parametrised_query( + httpx_mock: HTTPXMock, + async_query_callback: Callable, + async_query_url: str, + cursor: Cursor, + async_token: str, +): + httpx_mock.add_callback(async_query_callback, url=async_query_url) + cursor.execute_async("SELECT 2 WHERE x = ?", [1]) + verify_async_fetch_not_allowed(cursor) + assert cursor.async_query_token == async_token + assert cursor._state == CursorState.DONE + + +def test_cursor_execute_async_skip_parsing( + httpx_mock: HTTPXMock, + async_query_callback: Callable, + async_query_url: str, + cursor: Cursor, + async_token: str, +): + httpx_mock.add_callback(async_query_callback, url=async_query_url) + cursor.execute_async("SELECT 2; SELECT 3", skip_parsing=True) + verify_async_fetch_not_allowed(cursor) + assert cursor.async_query_token == async_token + assert cursor._state == CursorState.DONE + + +def test_cursor_execute_async_validate_set_parameters( + cursor: Cursor, +): + with raises(FireboltError) as e: + cursor.execute_async("SET a = b") + assert "does not support set" in str(e.value) + + +def test_cursor_execute_async_respects_api_errors( + httpx_mock: HTTPXMock, + async_query_url: str, + cursor: Cursor, +): + httpx_mock.add_callback( + lambda *args, **kwargs: Response(status_code=codes.BAD_REQUEST), + url=async_query_url, + ) + with raises(HTTPStatusError): + cursor.execute_async("SELECT 2") diff --git a/tests/unit/db_conftest.py b/tests/unit/db_conftest.py index a7cf01bd577..cb58fee7d1a 100644 --- a/tests/unit/db_conftest.py +++ b/tests/unit/db_conftest.py @@ -246,6 +246,18 @@ def query_url(engine_url: str, db_name: str) -> URL: ) +@fixture +def async_query_url(engine_url: str, db_name: str) -> URL: + return URL( + f"https://{engine_url}/", + params={ + "output_format": JSON_OUTPUT_FORMAT, + "database": db_name, + "async": "true", + }, + ) + + @fixture def query_url_updated(engine_url: str, db_name_updated: str) -> URL: return URL( @@ -608,3 +620,25 @@ def do_query(request: Request, **kwargs) -> Response: return Response(status_code=codes.OK, json=query_response) return do_query + + +@fixture +def async_token() -> str: + return "async_token" + + +@fixture +def async_query_callback(async_token: str) -> Callable: + def do_query(request: Request, **kwargs) -> Response: + assert request.read() != b"" + assert request.method == "POST" + assert f"output_format={JSON_OUTPUT_FORMAT}" in str(request.url) + assert "async=true" in str(request.url) + query_response = { + "message": "the query was accepted for async processing", + "monitorSql": "CALL fb_GetAsyncStatus('token');", + "token": async_token, + } + return Response(status_code=codes.ACCEPTED, json=query_response) + + return do_query From c285fe8898b47fd3d080a787fdf1a3fdbdc2ac04 Mon Sep 17 00:00:00 2001 From: ptiurin Date: Fri, 24 Jan 2025 10:51:13 +0000 Subject: [PATCH 08/19] refactor to reduce duplication and complexity --- src/firebolt/async_db/cursor.py | 84 ++++++++++------ src/firebolt/db/cursor.py | 170 ++++++++++++++++++++------------ 2 files changed, 161 insertions(+), 93 deletions(-) diff --git a/src/firebolt/async_db/cursor.py b/src/firebolt/async_db/cursor.py index aeecab29f39..10408d6494c 100644 --- a/src/firebolt/async_db/cursor.py +++ b/src/firebolt/async_db/cursor.py @@ -8,6 +8,7 @@ from typing import ( TYPE_CHECKING, Any, + Dict, Iterator, List, Optional, @@ -39,7 +40,6 @@ UPDATE_PARAMETERS_HEADER, BaseCursor, CursorState, - RowSet, _parse_update_endpoint, _parse_update_parameters, _raise_if_internal_set_parameter, @@ -193,47 +193,75 @@ async def _do_execute( parameters: Sequence[Sequence[ParameterType]], skip_parsing: bool = False, timeout: Optional[float] = None, + async_execution: bool = False, ) -> None: self._reset() - # Allow users to manually skip parsing for performance improvement. queries: List[Union[SetParameter, str]] = ( [raw_query] if skip_parsing else split_format_sql(raw_query, parameters) ) timeout_controller = TimeoutController(timeout) + + if len(queries) > 1 and async_execution: + raise FireboltError( + "execute_async does not support multi-statement queries" + ) try: for query in queries: - start_time = time.time() - Cursor._log_query(query) - timeout_controller.raise_if_timeout() - - if isinstance(query, SetParameter): - row_set: RowSet = (-1, None, None, None) - await self._validate_set_parameter( - query, timeout_controller.remaining() - ) - else: - resp = await self._api_request( - query, - {"output_format": JSON_OUTPUT_FORMAT}, - timeout=timeout_controller.remaining(), - ) - await self._raise_if_error(resp) - await self._parse_response_headers(resp.headers) - row_set = self._row_set_from_response(resp) - - self._append_row_set(row_set) - - logger.info( - f"Query fetched {self.rowcount} rows in" - f" {time.time() - start_time} seconds." + await self._execute_single_query( + query, timeout_controller, async_execution ) - self._state = CursorState.DONE - except Exception: self._state = CursorState.ERROR raise + async def _execute_single_query( + self, + query: Union[SetParameter, str], + timeout_controller: TimeoutController, + async_execution: bool, + ) -> None: + start_time = time.time() + Cursor._log_query(query) + timeout_controller.raise_if_timeout() + + if isinstance(query, SetParameter): + if async_execution: + raise FireboltError( + "execute_async does not support set statements, " + "please use execute to set this parameter" + ) + await self._validate_set_parameter(query, timeout_controller.remaining()) + else: + await self._handle_query_execution( + query, timeout_controller, async_execution + ) + + if not async_execution: + logger.info( + f"Query fetched {self.rowcount} rows in" + f" {time.time() - start_time} seconds." + ) + + async def _handle_query_execution( + self, query: str, timeout_controller: TimeoutController, async_execution: bool + ) -> None: + query_params: Dict[str, Any] = {"output_format": JSON_OUTPUT_FORMAT} + if async_execution: + query_params["async"] = True + resp = await self._api_request( + query, + query_params, + timeout=timeout_controller.remaining(), + ) + await self._raise_if_error(resp) + if async_execution: + self._parse_async_response(resp) + else: + await self._parse_response_headers(resp.headers) + row_set = self._row_set_from_response(resp) + self._append_row_set(row_set) + @check_not_closed async def execute( self, diff --git a/src/firebolt/db/cursor.py b/src/firebolt/db/cursor.py index bdfd5670d17..41e9f88b1cb 100644 --- a/src/firebolt/db/cursor.py +++ b/src/firebolt/db/cursor.py @@ -6,6 +6,7 @@ from typing import ( TYPE_CHECKING, Any, + Dict, Generator, List, Optional, @@ -37,7 +38,6 @@ UPDATE_PARAMETERS_HEADER, BaseCursor, CursorState, - RowSet, _parse_update_endpoint, _parse_update_parameters, _raise_if_internal_set_parameter, @@ -185,52 +185,92 @@ def _parse_response_headers(self, headers: Headers) -> None: param_dict = _parse_update_parameters(headers.get(UPDATE_PARAMETERS_HEADER)) self._update_set_parameters(param_dict) + @check_not_closed + def execute_async( + self, + query: str, + parameters: Optional[Sequence[ParameterType]] = None, + skip_parsing: bool = False, + ) -> int: + self._do_execute( + query, + [parameters] if parameters else [], + skip_parsing, + async_execution=True, + ) + return -1 + def _do_execute( self, raw_query: str, parameters: Sequence[Sequence[ParameterType]], skip_parsing: bool = False, timeout: Optional[float] = None, + async_execution: bool = False, ) -> None: self._reset() - # Allow users to manually skip parsing for performance improvement. queries: List[Union[SetParameter, str]] = ( [raw_query] if skip_parsing else split_format_sql(raw_query, parameters) ) timeout_controller = TimeoutController(timeout) + if len(queries) > 1 and async_execution: + raise FireboltError( + "execute_async does not support multi-statement queries" + ) try: for query in queries: - start_time = time.time() - Cursor._log_query(query) - timeout_controller.raise_if_timeout() - - if isinstance(query, SetParameter): - row_set: RowSet = (-1, None, None, None) - self._validate_set_parameter(query, timeout_controller.remaining()) - else: - resp = self._api_request( - query, - {"output_format": JSON_OUTPUT_FORMAT}, - timeout=timeout_controller.remaining(), - ) - self._raise_if_error(resp) - self._parse_response_headers(resp.headers) - row_set = self._row_set_from_response(resp) - - self._append_row_set(row_set) - - logger.info( - f"Query fetched {self.rowcount} rows in" - f" {time.time() - start_time} seconds." - ) - + self._execute_single_query(query, timeout_controller, async_execution) self._state = CursorState.DONE - except Exception: self._state = CursorState.ERROR raise + def _execute_single_query( + self, + query: Union[SetParameter, str], + timeout_controller: TimeoutController, + async_execution: bool, + ) -> None: + start_time = time.time() + Cursor._log_query(query) + timeout_controller.raise_if_timeout() + + if isinstance(query, SetParameter): + if async_execution: + raise FireboltError( + "execute_async does not support set statements, " + "please use execute to set this parameter" + ) + self._validate_set_parameter(query, timeout_controller.remaining()) + else: + self._handle_query_execution(query, timeout_controller, async_execution) + + if not async_execution: + logger.info( + f"Query fetched {self.rowcount} rows in" + f" {time.time() - start_time} seconds." + ) + + def _handle_query_execution( + self, query: str, timeout_controller: TimeoutController, async_execution: bool + ) -> None: + query_params: Dict[str, Any] = {"output_format": JSON_OUTPUT_FORMAT} + if async_execution: + query_params["async"] = True + resp = self._api_request( + query, + query_params, + timeout=timeout_controller.remaining(), + ) + self._raise_if_error(resp) + if async_execution: + self._parse_async_response(resp) + else: + self._parse_response_headers(resp.headers) + row_set = self._row_set_from_response(resp) + self._append_row_set(row_set) + @check_not_closed def execute( self, @@ -340,44 +380,44 @@ def __init__( assert isinstance(client, ClientV2) # Type check super().__init__(*args, client=client, connection=connection, **kwargs) - @check_not_closed - def execute_async( - self, - query: str, - parameters: Optional[Sequence[ParameterType]] = None, - skip_parsing: bool = False, - ) -> int: - self._reset() - # Allow users to manually skip parsing for performance improvement. - params_list = [parameters] if parameters else [] - queries: List[Union[SetParameter, str]] = ( - [query] if skip_parsing else split_format_sql(query, params_list) - ) - if len(queries) > 1: - raise FireboltError( - "execute_async does not support multi-statement queries" - ) - a_query = queries[0] - try: - if isinstance(a_query, SetParameter): - raise FireboltError( - "execute_async does not support set statements, " - "please use execute to set this parameter" - ) - else: - resp = self._api_request( - a_query, - {"output_format": JSON_OUTPUT_FORMAT, "async": True}, - ) - self._raise_if_error(resp) - self._parse_async_response(resp) - - self._state = CursorState.DONE - except Exception: - self._state = CursorState.ERROR - raise - - return -1 + # @check_not_closed + # def execute_async( + # self, + # query: str, + # parameters: Optional[Sequence[ParameterType]] = None, + # skip_parsing: bool = False, + # ) -> int: + # self._reset() + # # Allow users to manually skip parsing for performance improvement. + # params_list = [parameters] if parameters else [] + # queries: List[Union[SetParameter, str]] = ( + # [query] if skip_parsing else split_format_sql(query, params_list) + # ) + # if len(queries) > 1: + # raise FireboltError( + # "execute_async does not support multi-statement queries" + # ) + # a_query = queries[0] + # try: + # if isinstance(a_query, SetParameter): + # raise FireboltError( + # "execute_async does not support set statements, " + # "please use execute to set this parameter" + # ) + # else: + # resp = self._api_request( + # a_query, + # {"output_format": JSON_OUTPUT_FORMAT, "async": True}, + # ) + # self._raise_if_error(resp) + # self._parse_async_response(resp) + + # self._state = CursorState.DONE + # except Exception: + # self._state = CursorState.ERROR + # raise + + # return -1 def is_db_available(self, database_name: str) -> bool: """ From 1e7d431656761ed0da74d8aca3577887cfb0bc9b Mon Sep 17 00:00:00 2001 From: ptiurin Date: Fri, 24 Jan 2025 11:02:01 +0000 Subject: [PATCH 09/19] cleanup --- src/firebolt/async_db/cursor.py | 37 ++++++---------------------- src/firebolt/db/cursor.py | 43 ++------------------------------- 2 files changed, 9 insertions(+), 71 deletions(-) diff --git a/src/firebolt/async_db/cursor.py b/src/firebolt/async_db/cursor.py index 10408d6494c..e5d264c0049 100644 --- a/src/firebolt/async_db/cursor.py +++ b/src/firebolt/async_db/cursor.py @@ -203,7 +203,7 @@ async def _do_execute( if len(queries) > 1 and async_execution: raise FireboltError( - "execute_async does not support multi-statement queries" + "Server side async does not support multi-statement queries" ) try: for query in queries: @@ -228,7 +228,7 @@ async def _execute_single_query( if isinstance(query, SetParameter): if async_execution: raise FireboltError( - "execute_async does not support set statements, " + "Server side async does not support set statements, " "please use execute to set this parameter" ) await self._validate_set_parameter(query, timeout_controller.remaining()) @@ -428,35 +428,12 @@ async def execute_async( parameters: Optional[Sequence[ParameterType]] = None, skip_parsing: bool = False, ) -> int: - self._reset() - # Allow users to manually skip parsing for performance improvement. - params_list = [parameters] if parameters else [] - queries: List[Union[SetParameter, str]] = ( - [query] if skip_parsing else split_format_sql(query, params_list) + await self._do_execute( + query, + [parameters] if parameters else [], + skip_parsing, + async_execution=True, ) - if len(queries) > 1: - raise FireboltError( - "execute_async does not support multi-statement queries" - ) - a_query = queries[0] - try: - if isinstance(a_query, SetParameter): - raise FireboltError( - "execute_async does not support set statements, " - "please use execute to set this parameter" - ) - else: - resp = await self._api_request( - a_query, - {"output_format": JSON_OUTPUT_FORMAT, "async": True}, - ) - await self._raise_if_error(resp) - self._parse_async_response(resp) - self._state = CursorState.DONE - except Exception: - self._state = CursorState.ERROR - raise - return -1 async def is_db_available(self, database_name: str) -> bool: diff --git a/src/firebolt/db/cursor.py b/src/firebolt/db/cursor.py index 41e9f88b1cb..4678f5afe3a 100644 --- a/src/firebolt/db/cursor.py +++ b/src/firebolt/db/cursor.py @@ -216,7 +216,7 @@ def _do_execute( if len(queries) > 1 and async_execution: raise FireboltError( - "execute_async does not support multi-statement queries" + "Server side async does not support multi-statement queries" ) try: for query in queries: @@ -239,7 +239,7 @@ def _execute_single_query( if isinstance(query, SetParameter): if async_execution: raise FireboltError( - "execute_async does not support set statements, " + "Server side async does not support set statements, " "please use execute to set this parameter" ) self._validate_set_parameter(query, timeout_controller.remaining()) @@ -380,45 +380,6 @@ def __init__( assert isinstance(client, ClientV2) # Type check super().__init__(*args, client=client, connection=connection, **kwargs) - # @check_not_closed - # def execute_async( - # self, - # query: str, - # parameters: Optional[Sequence[ParameterType]] = None, - # skip_parsing: bool = False, - # ) -> int: - # self._reset() - # # Allow users to manually skip parsing for performance improvement. - # params_list = [parameters] if parameters else [] - # queries: List[Union[SetParameter, str]] = ( - # [query] if skip_parsing else split_format_sql(query, params_list) - # ) - # if len(queries) > 1: - # raise FireboltError( - # "execute_async does not support multi-statement queries" - # ) - # a_query = queries[0] - # try: - # if isinstance(a_query, SetParameter): - # raise FireboltError( - # "execute_async does not support set statements, " - # "please use execute to set this parameter" - # ) - # else: - # resp = self._api_request( - # a_query, - # {"output_format": JSON_OUTPUT_FORMAT, "async": True}, - # ) - # self._raise_if_error(resp) - # self._parse_async_response(resp) - - # self._state = CursorState.DONE - # except Exception: - # self._state = CursorState.ERROR - # raise - - # return -1 - def is_db_available(self, database_name: str) -> bool: """ Verify that the database exists. From bc3013b5a9573f2f54fe0cac9f8e1c92345b986c Mon Sep 17 00:00:00 2001 From: ptiurin Date: Fri, 24 Jan 2025 11:27:11 +0000 Subject: [PATCH 10/19] add running test --- src/firebolt/db/connection.py | 7 +++++-- .../dbapi/sync/V2/test_server_async.py | 17 +++++++++++++++++ 2 files changed, 22 insertions(+), 2 deletions(-) diff --git a/src/firebolt/db/connection.py b/src/firebolt/db/connection.py index 1ea1cfa2bab..effed1fff5a 100644 --- a/src/firebolt/db/connection.py +++ b/src/firebolt/db/connection.py @@ -244,8 +244,11 @@ def _get_async_query_status(self, token: str) -> str: def is_async_query_running(self, token: str) -> bool: return self._get_async_query_status(token) == ASYNC_QUERY_STATUS_RUNNING - def is_async_query_successful(self, token: str) -> bool: - return self._get_async_query_status(token) == ASYNC_QUERY_STATUS_SUCCESSFUL + def is_async_query_successful(self, token: str) -> Optional[bool]: + status = self._get_async_query_status(token) + if status == ASYNC_QUERY_STATUS_RUNNING: + return None + return status == ASYNC_QUERY_STATUS_SUCCESSFUL # Context manager support def __enter__(self) -> Connection: diff --git a/tests/integration/dbapi/sync/V2/test_server_async.py b/tests/integration/dbapi/sync/V2/test_server_async.py index a92bb30b5db..46974224299 100644 --- a/tests/integration/dbapi/sync/V2/test_server_async.py +++ b/tests/integration/dbapi/sync/V2/test_server_async.py @@ -4,6 +4,8 @@ from firebolt.db import Connection +LONG_SELECT = "SELECT checksum(*) FROM GENERATE_SERIES(1, 2500000000)" # approx 3 sec + def test_insert_async(connection: Connection) -> None: cursor = connection.cursor() @@ -27,6 +29,21 @@ def test_insert_async(connection: Connection) -> None: cursor.execute(f"DROP TABLE {table_name}") +def test_insert_async_running(connection: Connection) -> None: + cursor = connection.cursor() + rnd_suffix = str(randint(0, 1000)) + table_name = f"test_insert_async_{rnd_suffix}" + try: + cursor.execute(f"CREATE TABLE {table_name} (id LONG)") + cursor.execute_async(f"INSERT INTO {table_name} {LONG_SELECT}") + token = cursor.async_query_token + assert token is not None, "Asyc token was not returned" + assert connection.is_async_query_running(token) == True + assert connection.is_async_query_successful(token) is None + finally: + cursor.execute(f"DROP TABLE {table_name}") + + def test_check_async_execution_from_another_connection( connection_factory: Callable[..., Connection] ) -> None: From dea7fce5f6354eff55866491341119178211e67b Mon Sep 17 00:00:00 2001 From: ptiurin Date: Fri, 24 Jan 2025 11:34:07 +0000 Subject: [PATCH 11/19] async tests --- src/firebolt/async_db/connection.py | 4 +- tests/integration/dbapi/async/V2/conftest.py | 23 +++++- .../dbapi/async/V2/test_server_async.py | 76 +++++++++++++++++++ 3 files changed, 101 insertions(+), 2 deletions(-) create mode 100644 tests/integration/dbapi/async/V2/test_server_async.py diff --git a/src/firebolt/async_db/connection.py b/src/firebolt/async_db/connection.py index 468c7c5b165..a4d81046416 100644 --- a/src/firebolt/async_db/connection.py +++ b/src/firebolt/async_db/connection.py @@ -108,8 +108,10 @@ async def is_async_query_running(self, token: str) -> bool: status = await self._get_async_query_status(token) return status == ASYNC_QUERY_STATUS_RUNNING - async def is_async_query_successful(self, token: str) -> bool: + async def is_async_query_successful(self, token: str) -> Optional[bool]: status = await self._get_async_query_status(token) + if status == ASYNC_QUERY_STATUS_RUNNING: + return None return status == ASYNC_QUERY_STATUS_SUCCESSFUL # Context manager support diff --git a/tests/integration/dbapi/async/V2/conftest.py b/tests/integration/dbapi/async/V2/conftest.py index edfd6f96a4d..a379f0c667f 100644 --- a/tests/integration/dbapi/async/V2/conftest.py +++ b/tests/integration/dbapi/async/V2/conftest.py @@ -1,6 +1,6 @@ import random import string -from typing import Tuple +from typing import Any, Callable, Tuple from pytest import fixture @@ -28,6 +28,27 @@ async def connection( yield connection +@fixture +async def connection_factory( + engine_name: str, + database_name: str, + auth: Auth, + account_name: str, + api_endpoint: str, +) -> Callable[..., Connection]: + async def factory(**kwargs: Any) -> Connection: + return await connect( + engine_name=engine_name, + database=database_name, + auth=auth, + account_name=account_name, + api_endpoint=api_endpoint, + **kwargs, + ) + + return factory + + @fixture async def connection_no_db( engine_name: str, diff --git a/tests/integration/dbapi/async/V2/test_server_async.py b/tests/integration/dbapi/async/V2/test_server_async.py new file mode 100644 index 00000000000..d0869925a4f --- /dev/null +++ b/tests/integration/dbapi/async/V2/test_server_async.py @@ -0,0 +1,76 @@ +import time +from random import randint +from typing import Callable + +from firebolt.db import Connection + +LONG_SELECT = "SELECT checksum(*) FROM GENERATE_SERIES(1, 2500000000)" # approx 3 sec + + +async def test_insert_async(connection: Connection) -> None: + cursor = connection.cursor() + rnd_suffix = str(randint(0, 1000)) + table_name = f"test_insert_async_{rnd_suffix}" + try: + await cursor.execute(f"CREATE TABLE {table_name} (id INT, name TEXT)") + await cursor.execute_async( + f"INSERT INTO {table_name} (id, name) VALUES (1, 'test')" + ) + token = cursor.async_query_token + assert token is not None, "Asyc token was not returned" + # sleep for 2 sec to make sure the async query is completed + time.sleep(2) + assert await connection.is_async_query_running(token) == False + assert await connection.is_async_query_successful(token) == True + # Verify the result + cursor = connection.cursor() + await cursor.execute(f"SELECT * FROM {table_name}") + result = await cursor.fetchall() + assert result == [[1, "test"]] + finally: + await cursor.execute(f"DROP TABLE {table_name}") + + +async def test_insert_async_running(connection: Connection) -> None: + cursor = connection.cursor() + rnd_suffix = str(randint(0, 1000)) + table_name = f"test_insert_async_{rnd_suffix}" + try: + await cursor.execute(f"CREATE TABLE {table_name} (id LONG)") + await cursor.execute_async(f"INSERT INTO {table_name} {LONG_SELECT}") + token = cursor.async_query_token + assert token is not None, "Asyc token was not returned" + assert await connection.is_async_query_running(token) == True + assert await connection.is_async_query_successful(token) is None + finally: + await cursor.execute(f"DROP TABLE {table_name}") + + +async def test_check_async_execution_from_another_connection( + connection_factory: Callable[..., Connection] +) -> None: + connection_1 = await connection_factory() + connection_2 = await connection_factory() + cursor = connection_1.cursor() + rnd_suffix = str(randint(0, 1000)) + table_name = f"test_insert_async_{rnd_suffix}" + try: + await cursor.execute(f"CREATE TABLE {table_name} (id INT, name TEXT)") + await cursor.execute_async( + f"INSERT INTO {table_name} (id, name) VALUES (1, 'test')" + ) + token = cursor.async_query_token + assert token is not None, "Asyc token was not returned" + # sleep for 2 sec to make sure the async query is completed + time.sleep(2) + assert await connection_2.is_async_query_running(token) == False + assert await connection_2.is_async_query_successful(token) == True + # Verify the result + cursor = connection_2.cursor() + await cursor.execute(f"SELECT * FROM {table_name}") + result = await cursor.fetchall() + assert result == [[1, "test"]] + finally: + await cursor.execute(f"DROP TABLE {table_name}") + await connection_1.aclose() + await connection_2.aclose() From b19c8604bda8bf8e6b403b042e7c6295599f62e6 Mon Sep 17 00:00:00 2001 From: ptiurin Date: Fri, 24 Jan 2025 11:40:50 +0000 Subject: [PATCH 12/19] fix tests --- tests/unit/async_db/test_connection.py | 6 +++--- tests/unit/db/test_connection.py | 6 +++--- 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/tests/unit/async_db/test_connection.py b/tests/unit/async_db/test_connection.py index 07e81971e36..801f9353962 100644 --- a/tests/unit/async_db/test_connection.py +++ b/tests/unit/async_db/test_connection.py @@ -1,4 +1,4 @@ -from typing import Callable, List, Tuple +from typing import Callable, List, Optional, Tuple from unittest.mock import patch from pyfakefs.fake_filesystem_unittest import Patcher @@ -411,7 +411,7 @@ async def test_connect_no_user_agent( @mark.parametrize( "server_status,expected_running,expected_success", [ - ("RUNNING", True, False), + ("RUNNING", True, None), ("ENDED_SUCCESSFULLY", False, True), ("FAILED", False, False), ("CANCELLED", False, False), @@ -431,7 +431,7 @@ async def test_is_async_query_running_success( mock_connection_flow: Callable, server_status: str, expected_running: bool, - expected_success: bool, + expected_success: Optional[bool], ): """Test is_async_query_running method""" mock_connection_flow() diff --git a/tests/unit/db/test_connection.py b/tests/unit/db/test_connection.py index f36694e8829..87c21e3aba7 100644 --- a/tests/unit/db/test_connection.py +++ b/tests/unit/db/test_connection.py @@ -1,6 +1,6 @@ import gc import warnings -from typing import Callable, List, Tuple +from typing import Callable, List, Optional, Tuple from unittest.mock import patch from pyfakefs.fake_filesystem_unittest import Patcher @@ -429,7 +429,7 @@ def test_connect_no_user_agent( @mark.parametrize( "server_status,expected_running,expected_success", [ - ("RUNNING", True, False), + ("RUNNING", True, None), ("ENDED_SUCCESSFULLY", False, True), ("FAILED", False, False), ("CANCELLED", False, False), @@ -449,7 +449,7 @@ def test_is_async_query_running_success( mock_connection_flow: Callable, server_status: str, expected_running: bool, - expected_success: bool, + expected_success: Optional[bool], ): """Test is_async_query_running method""" mock_connection_flow() From 52b02d19165b14d5fb281d391785c41a9549810c Mon Sep 17 00:00:00 2001 From: ptiurin Date: Fri, 24 Jan 2025 16:18:09 +0000 Subject: [PATCH 13/19] Adding documentation and docstrings --- docsrc/Connecting_and_queries.rst | 62 +++++++++++++++++++++++++++++ src/firebolt/async_db/connection.py | 19 +++++++++ src/firebolt/async_db/cursor.py | 26 ++++++++++++ src/firebolt/db/connection.py | 19 +++++++++ src/firebolt/db/cursor.py | 26 ++++++++++++ 5 files changed, 152 insertions(+) diff --git a/docsrc/Connecting_and_queries.rst b/docsrc/Connecting_and_queries.rst index 04cef199a9a..5db3b34fd35 100644 --- a/docsrc/Connecting_and_queries.rst +++ b/docsrc/Connecting_and_queries.rst @@ -594,6 +594,68 @@ load on both server and client machines can be controlled. A suggested way is to run(run_multiple_queries()) +Server-side asynchronous query execution +========================================== +Firebolt supports server-side asynchronous query execution. This feature allows you to run +queries in the background and fetch the results later. This is especially useful for long-running +queries that you don't want to wait for or maintain a persistent connection to the server. + +This feature is not to be confused with the Python SDK's asynchronous functionality, which is +described in the :ref:`Asynchronous query execution ` section, +used to write concurrent code. Server-side asynchronous query execution is a feature of the +Firebolt engine itself. + +Submitting an asynchronous query +-------------------------------- + +Use :py:meth:`firebolt.db.cursor.Cursor.execute_async` method to run query without maintaing a persistent connection. +This method will return immediately, and the query will be executed in the background. Return value +of execute_async is -1, which is the rowcount for queries where it's not applicable. +`cursor.async_query_token` attribute will contain a token that can be used to monitor the query status. + +:: + + # Synchronous execution + cursor.execute("CREATE TABLE my_table (id INT, name TEXT, date_value DATE)") + + # Asynchronous execution + cursor.execute_async("INSERT INTO my_table VALUES (5, 'egg', '2022-01-01')") + token = cursor.async_query_token + +Trying to access `async_query_token` before calling `execute_async` will raise an exception. + +.. note:: + Multiple-statement queries are not supported for asynchronous queries. However, you can run each statement + separately using multiple `execute_async` calls. + +.. note:: + Fetching data via SELECT is not supported and will raise an exception. execute_async is best suited for DML queries. + +Monitoring the query status +---------------------------- + +To check the async query status you need to retrieve the token of the query. The token is a unique +identifier for the query and can be used to fetch the query status. You can store this token +outside of the current process and use it later to check the query status. :ref:`Connection ` object +has two methods to check the query status: :py:meth:`firebolt.db.connection.Connection.is_async_query_running` and +:py:meth:`firebolt.db.connection.Connection.is_async_query_successful`.`is_async_query_running` will return True +if the query is still running, and False otherwise. `is_async_query_successful` will return True if the query +has finished successfully, None if query is still running and False if the query has failed. + +:: + + while(connection.is_async_query_running(token)): + print("Query is still running") + time.sleep(1) + print("Query has finished") + + success = connection.is_async_query_successful(token) + if success: + print("Query was successful") + else: + print("Query failed") + + Thread safety ============================== diff --git a/src/firebolt/async_db/connection.py b/src/firebolt/async_db/connection.py index a4d81046416..49a31dc6650 100644 --- a/src/firebolt/async_db/connection.py +++ b/src/firebolt/async_db/connection.py @@ -105,10 +105,29 @@ async def _get_async_query_status(self, token: str) -> str: return str(result_dict.get("status")) async def is_async_query_running(self, token: str) -> bool: + """ + Check if an async query is still running. + + Args: + token: Async query token. Can be obtained from Cursor.async_query_token. + + Returns: + bool: True if async query is still running, False otherwise + """ status = await self._get_async_query_status(token) return status == ASYNC_QUERY_STATUS_RUNNING async def is_async_query_successful(self, token: str) -> Optional[bool]: + """ + Check if an async query has finished and was successful. + + Args: + token: Async query token. Can be obtained from Cursor.async_query_token. + + Returns: + bool: None if the query is still running, True if successful, + False otherwise + """ status = await self._get_async_query_status(token) if status == ASYNC_QUERY_STATUS_RUNNING: return None diff --git a/src/firebolt/async_db/cursor.py b/src/firebolt/async_db/cursor.py index e5d264c0049..3747fee2932 100644 --- a/src/firebolt/async_db/cursor.py +++ b/src/firebolt/async_db/cursor.py @@ -428,6 +428,32 @@ async def execute_async( parameters: Optional[Sequence[ParameterType]] = None, skip_parsing: bool = False, ) -> int: + """ + Execute a database query without maintating a connection. + + Supported features: + Parameterized queries: placeholder characters ('?') are substituted + with values provided in `parameters`. Values are formatted to + be properly recognized by database and to exclude SQL injection. + + Not supported: + Multi-statement queries: multiple statements, provided in a single query + and separated by semicolon. + SET statements: to provide additional query execution parameters, execute + `SET param=value` statement before it. Use `execute` method to set + parameters. + + Args: + query (str): SQL query to execute + parameters (Optional[Sequence[ParameterType]]): A sequence of substitution + parameters. Used to replace '?' placeholders inside a query with + actual values + skip_parsing (bool): Flag to disable query parsing. This will + disable parameterized queries while potentially improving performance + + Returns: + int: Always returns -1, as async execution does not return row count. + """ await self._do_execute( query, [parameters] if parameters else [], diff --git a/src/firebolt/db/connection.py b/src/firebolt/db/connection.py index effed1fff5a..aeab156dd9d 100644 --- a/src/firebolt/db/connection.py +++ b/src/firebolt/db/connection.py @@ -242,9 +242,28 @@ def _get_async_query_status(self, token: str) -> str: return result_dict["status"] def is_async_query_running(self, token: str) -> bool: + """ + Check if an async query is still running. + + Args: + token: Async query token. Can be obtained from Cursor.async_query_token. + + Returns: + bool: True if async query is still running, False otherwise + """ return self._get_async_query_status(token) == ASYNC_QUERY_STATUS_RUNNING def is_async_query_successful(self, token: str) -> Optional[bool]: + """ + Check if an async query has finished and was successful. + + Args: + token: Async query token. Can be obtained from Cursor.async_query_token. + + Returns: + bool: None if the query is still running, True if successful, + False otherwise + """ status = self._get_async_query_status(token) if status == ASYNC_QUERY_STATUS_RUNNING: return None diff --git a/src/firebolt/db/cursor.py b/src/firebolt/db/cursor.py index 4678f5afe3a..5cbc61c809b 100644 --- a/src/firebolt/db/cursor.py +++ b/src/firebolt/db/cursor.py @@ -192,6 +192,32 @@ def execute_async( parameters: Optional[Sequence[ParameterType]] = None, skip_parsing: bool = False, ) -> int: + """ + Execute a database query without maintating a connection. + + Supported features: + Parameterized queries: placeholder characters ('?') are substituted + with values provided in `parameters`. Values are formatted to + be properly recognized by database and to exclude SQL injection. + + Not supported: + Multi-statement queries: multiple statements, provided in a single query + and separated by semicolon. + SET statements: to provide additional query execution parameters, execute + `SET param=value` statement before it. Use `execute` method to set + parameters. + + Args: + query (str): SQL query to execute + parameters (Optional[Sequence[ParameterType]]): A sequence of substitution + parameters. Used to replace '?' placeholders inside a query with + actual values + skip_parsing (bool): Flag to disable query parsing. This will + disable parameterized queries while potentially improving performance + + Returns: + int: Always returns -1, as async execution does not return row count. + """ self._do_execute( query, [parameters] if parameters else [], From 4a001d673dc0c681998396b4cfda45fce8cc80e1 Mon Sep 17 00:00:00 2001 From: ptiurin Date: Mon, 27 Jan 2025 09:33:15 +0000 Subject: [PATCH 14/19] logging --- src/firebolt/async_db/cursor.py | 2 ++ src/firebolt/db/cursor.py | 2 ++ 2 files changed, 4 insertions(+) diff --git a/src/firebolt/async_db/cursor.py b/src/firebolt/async_db/cursor.py index 3747fee2932..408a0a5dc7a 100644 --- a/src/firebolt/async_db/cursor.py +++ b/src/firebolt/async_db/cursor.py @@ -242,6 +242,8 @@ async def _execute_single_query( f"Query fetched {self.rowcount} rows in" f" {time.time() - start_time} seconds." ) + else: + logger.info("Query submitted for async execution.") async def _handle_query_execution( self, query: str, timeout_controller: TimeoutController, async_execution: bool diff --git a/src/firebolt/db/cursor.py b/src/firebolt/db/cursor.py index 5cbc61c809b..c0d35e0e568 100644 --- a/src/firebolt/db/cursor.py +++ b/src/firebolt/db/cursor.py @@ -277,6 +277,8 @@ def _execute_single_query( f"Query fetched {self.rowcount} rows in" f" {time.time() - start_time} seconds." ) + else: + logger.info("Query submitted for async execution.") def _handle_query_execution( self, query: str, timeout_controller: TimeoutController, async_execution: bool From 017595548ba602d3fd20b89c4507c2656cf4ab24 Mon Sep 17 00:00:00 2001 From: ptiurin Date: Mon, 27 Jan 2025 09:38:01 +0000 Subject: [PATCH 15/19] async typo --- tests/integration/dbapi/async/V2/test_server_async.py | 6 +++--- tests/integration/dbapi/sync/V2/test_server_async.py | 6 +++--- 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/tests/integration/dbapi/async/V2/test_server_async.py b/tests/integration/dbapi/async/V2/test_server_async.py index d0869925a4f..8ee395fab6f 100644 --- a/tests/integration/dbapi/async/V2/test_server_async.py +++ b/tests/integration/dbapi/async/V2/test_server_async.py @@ -17,7 +17,7 @@ async def test_insert_async(connection: Connection) -> None: f"INSERT INTO {table_name} (id, name) VALUES (1, 'test')" ) token = cursor.async_query_token - assert token is not None, "Asyc token was not returned" + assert token is not None, "Async token was not returned" # sleep for 2 sec to make sure the async query is completed time.sleep(2) assert await connection.is_async_query_running(token) == False @@ -39,7 +39,7 @@ async def test_insert_async_running(connection: Connection) -> None: await cursor.execute(f"CREATE TABLE {table_name} (id LONG)") await cursor.execute_async(f"INSERT INTO {table_name} {LONG_SELECT}") token = cursor.async_query_token - assert token is not None, "Asyc token was not returned" + assert token is not None, "Async token was not returned" assert await connection.is_async_query_running(token) == True assert await connection.is_async_query_successful(token) is None finally: @@ -60,7 +60,7 @@ async def test_check_async_execution_from_another_connection( f"INSERT INTO {table_name} (id, name) VALUES (1, 'test')" ) token = cursor.async_query_token - assert token is not None, "Asyc token was not returned" + assert token is not None, "Async token was not returned" # sleep for 2 sec to make sure the async query is completed time.sleep(2) assert await connection_2.is_async_query_running(token) == False diff --git a/tests/integration/dbapi/sync/V2/test_server_async.py b/tests/integration/dbapi/sync/V2/test_server_async.py index 46974224299..ea227a402e4 100644 --- a/tests/integration/dbapi/sync/V2/test_server_async.py +++ b/tests/integration/dbapi/sync/V2/test_server_async.py @@ -15,7 +15,7 @@ def test_insert_async(connection: Connection) -> None: cursor.execute(f"CREATE TABLE {table_name} (id INT, name TEXT)") cursor.execute_async(f"INSERT INTO {table_name} (id, name) VALUES (1, 'test')") token = cursor.async_query_token - assert token is not None, "Asyc token was not returned" + assert token is not None, "Async token was not returned" # sleep for 2 sec to make sure the async query is completed time.sleep(2) assert connection.is_async_query_running(token) == False @@ -37,7 +37,7 @@ def test_insert_async_running(connection: Connection) -> None: cursor.execute(f"CREATE TABLE {table_name} (id LONG)") cursor.execute_async(f"INSERT INTO {table_name} {LONG_SELECT}") token = cursor.async_query_token - assert token is not None, "Asyc token was not returned" + assert token is not None, "Async token was not returned" assert connection.is_async_query_running(token) == True assert connection.is_async_query_successful(token) is None finally: @@ -56,7 +56,7 @@ def test_check_async_execution_from_another_connection( cursor.execute(f"CREATE TABLE {table_name} (id INT, name TEXT)") cursor.execute_async(f"INSERT INTO {table_name} (id, name) VALUES (1, 'test')") token = cursor.async_query_token - assert token is not None, "Asyc token was not returned" + assert token is not None, "Async token was not returned" # sleep for 2 sec to make sure the async query is completed time.sleep(2) assert connection_2.is_async_query_running(token) == False From 35e52f9ff9bfcc5a2fc2b1549b3c7614d3ac4c09 Mon Sep 17 00:00:00 2001 From: ptiurin Date: Mon, 27 Jan 2025 09:45:35 +0000 Subject: [PATCH 16/19] clarification in docs --- docsrc/Connecting_and_queries.rst | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/docsrc/Connecting_and_queries.rst b/docsrc/Connecting_and_queries.rst index 5db3b34fd35..406ea953032 100644 --- a/docsrc/Connecting_and_queries.rst +++ b/docsrc/Connecting_and_queries.rst @@ -650,6 +650,11 @@ has finished successfully, None if query is still running and False if the query print("Query has finished") success = connection.is_async_query_successful(token) + # success is None if the query is still running + if success is None: + # we should not reach this point since we've waited for is_async_query_running + raise Exception("The query is still running, use is_async_query_running to check the status") + if success: print("Query was successful") else: From e85f58b0de6d2b355432672cef196ed108097e84 Mon Sep 17 00:00:00 2001 From: ptiurin Date: Mon, 27 Jan 2025 11:47:42 +0000 Subject: [PATCH 17/19] Ensure no async call in v1 --- src/firebolt/async_db/cursor.py | 21 +++++++ src/firebolt/db/cursor.py | 89 +++++++++++++++++---------- tests/unit/V1/async_db/test_cursor.py | 8 +++ tests/unit/V1/db/test_cursor.py | 8 +++ 4 files changed, 92 insertions(+), 34 deletions(-) diff --git a/src/firebolt/async_db/cursor.py b/src/firebolt/async_db/cursor.py index 408a0a5dc7a..4769452fda0 100644 --- a/src/firebolt/async_db/cursor.py +++ b/src/firebolt/async_db/cursor.py @@ -50,6 +50,7 @@ EngineNotRunningError, FireboltDatabaseError, FireboltError, + NotSupportedError, OperationalError, ProgrammingError, QueryTimeoutError, @@ -187,6 +188,16 @@ async def _parse_response_headers(self, headers: Headers) -> None: param_dict = _parse_update_parameters(headers.get(UPDATE_PARAMETERS_HEADER)) self._update_set_parameters(param_dict) + @abstractmethod + async def execute_async( + self, + query: str, + parameters: Optional[Sequence[ParameterType]] = None, + skip_parsing: bool = False, + ) -> int: + """Execute a database query without maintaining a connection.""" + ... + async def _do_execute( self, raw_query: str, @@ -540,3 +551,13 @@ async def _filter_request(self, endpoint: str, filters: dict) -> Response: ) resp.raise_for_status() return resp + + async def execute_async( + self, + query: str, + parameters: Optional[Sequence[ParameterType]] = None, + skip_parsing: bool = False, + ) -> int: + raise NotSupportedError( + "Async execution is not supported in this version " " of Firebolt." + ) diff --git a/src/firebolt/db/cursor.py b/src/firebolt/db/cursor.py index c0d35e0e568..1dbbc4d971f 100644 --- a/src/firebolt/db/cursor.py +++ b/src/firebolt/db/cursor.py @@ -48,6 +48,7 @@ EngineNotRunningError, FireboltDatabaseError, FireboltError, + NotSupportedError, OperationalError, ProgrammingError, QueryTimeoutError, @@ -185,46 +186,15 @@ def _parse_response_headers(self, headers: Headers) -> None: param_dict = _parse_update_parameters(headers.get(UPDATE_PARAMETERS_HEADER)) self._update_set_parameters(param_dict) - @check_not_closed + @abstractmethod def execute_async( self, query: str, parameters: Optional[Sequence[ParameterType]] = None, skip_parsing: bool = False, ) -> int: - """ - Execute a database query without maintating a connection. - - Supported features: - Parameterized queries: placeholder characters ('?') are substituted - with values provided in `parameters`. Values are formatted to - be properly recognized by database and to exclude SQL injection. - - Not supported: - Multi-statement queries: multiple statements, provided in a single query - and separated by semicolon. - SET statements: to provide additional query execution parameters, execute - `SET param=value` statement before it. Use `execute` method to set - parameters. - - Args: - query (str): SQL query to execute - parameters (Optional[Sequence[ParameterType]]): A sequence of substitution - parameters. Used to replace '?' placeholders inside a query with - actual values - skip_parsing (bool): Flag to disable query parsing. This will - disable parameterized queries while potentially improving performance - - Returns: - int: Always returns -1, as async execution does not return row count. - """ - self._do_execute( - query, - [parameters] if parameters else [], - skip_parsing, - async_execution=True, - ) - return -1 + """Execute a database query without maintaining a connection.""" + ... def _do_execute( self, @@ -433,6 +403,47 @@ def is_engine_running(self, engine_url: str) -> bool: # so we can't check if it's running return True + @check_not_closed + def execute_async( + self, + query: str, + parameters: Optional[Sequence[ParameterType]] = None, + skip_parsing: bool = False, + ) -> int: + """ + Execute a database query without maintating a connection. + + Supported features: + Parameterized queries: placeholder characters ('?') are substituted + with values provided in `parameters`. Values are formatted to + be properly recognized by database and to exclude SQL injection. + + Not supported: + Multi-statement queries: multiple statements, provided in a single query + and separated by semicolon. + SET statements: to provide additional query execution parameters, execute + `SET param=value` statement before it. Use `execute` method to set + parameters. + + Args: + query (str): SQL query to execute + parameters (Optional[Sequence[ParameterType]]): A sequence of substitution + parameters. Used to replace '?' placeholders inside a query with + actual values + skip_parsing (bool): Flag to disable query parsing. This will + disable parameterized queries while potentially improving performance + + Returns: + int: Always returns -1, as async execution does not return row count. + """ + self._do_execute( + query, + [parameters] if parameters else [], + skip_parsing, + async_execution=True, + ) + return -1 + class CursorV1(Cursor): def __init__( @@ -481,3 +492,13 @@ def _filter_request(self, endpoint: str, filters: dict) -> Response: ) resp.raise_for_status() return resp + + def execute_async( + self, + query: str, + parameters: Optional[Sequence[ParameterType]] = None, + skip_parsing: bool = False, + ) -> int: + raise NotSupportedError( + "Async execution is not supported in this version " " of Firebolt." + ) diff --git a/tests/unit/V1/async_db/test_cursor.py b/tests/unit/V1/async_db/test_cursor.py index abea8007a6b..825044fe696 100644 --- a/tests/unit/V1/async_db/test_cursor.py +++ b/tests/unit/V1/async_db/test_cursor.py @@ -14,6 +14,7 @@ DataError, EngineNotRunningError, FireboltDatabaseError, + NotSupportedError, OperationalError, QueryNotRunError, ) @@ -677,3 +678,10 @@ async def test_disallowed_set_parameter(cursor: Cursor, parameter: str) -> None: e.value ), "invalid error" assert cursor._set_parameters == {}, "set parameters should not be updated" + + +async def test_cursor_execute_async_raises(cursor: Cursor) -> None: + """Test that calling execute_async raises an error.""" + with raises(NotSupportedError) as e: + await cursor.execute_async("select 1") + assert "Async execution is not supported" in str(e.value), "invalid error" diff --git a/tests/unit/V1/db/test_cursor.py b/tests/unit/V1/db/test_cursor.py index 4510f5a9710..9382977e8f6 100644 --- a/tests/unit/V1/db/test_cursor.py +++ b/tests/unit/V1/db/test_cursor.py @@ -12,6 +12,7 @@ ConfigurationError, CursorClosedError, DataError, + NotSupportedError, OperationalError, QueryNotRunError, ) @@ -623,3 +624,10 @@ def test_disallowed_set_parameter(cursor: Cursor, parameter: str) -> None: e.value ), "invalid error" assert cursor._set_parameters == {}, "set parameters should not be updated" + + +def test_cursor_execute_async_raises(cursor: Cursor) -> None: + """Test that calling execute_async raises an error.""" + with raises(NotSupportedError) as e: + cursor.execute_async("select 1") + assert "Async execution is not supported" in str(e.value), "invalid error" From c0380e2ce0a8d264f2f114f5b3382cac87ba81ce Mon Sep 17 00:00:00 2001 From: ptiurin Date: Mon, 27 Jan 2025 13:08:29 +0000 Subject: [PATCH 18/19] additional tests --- .../dbapi/async/V2/test_server_async.py | 28 +++++++++++++++++++ .../dbapi/sync/V2/test_server_async.py | 28 +++++++++++++++++++ 2 files changed, 56 insertions(+) diff --git a/tests/integration/dbapi/async/V2/test_server_async.py b/tests/integration/dbapi/async/V2/test_server_async.py index 8ee395fab6f..2875fc07778 100644 --- a/tests/integration/dbapi/async/V2/test_server_async.py +++ b/tests/integration/dbapi/async/V2/test_server_async.py @@ -2,7 +2,10 @@ from random import randint from typing import Callable +from pytest import raises + from firebolt.db import Connection +from firebolt.utils.exception import FireboltError, FireboltStructuredError LONG_SELECT = "SELECT checksum(*) FROM GENERATE_SERIES(1, 2500000000)" # approx 3 sec @@ -74,3 +77,28 @@ async def test_check_async_execution_from_another_connection( await cursor.execute(f"DROP TABLE {table_name}") await connection_1.aclose() await connection_2.aclose() + + +async def test_check_async_query_fails(connection: Connection) -> None: + cursor = connection.cursor() + rnd_suffix = str(randint(0, 1000)) + table_name = f"test_insert_async_{rnd_suffix}" + try: + await cursor.execute(f"CREATE TABLE {table_name} (id LONG)") + await cursor.execute_async(f"INSERT INTO {table_name} VALUES ('string')") + token = cursor.async_query_token + assert token is not None, "Async token was not returned" + # sleep for 2 sec to make sure the async query is completed + time.sleep(2) + assert await connection.is_async_query_running(token) == False + assert await connection.is_async_query_successful(token) == False + finally: + await cursor.execute(f"DROP TABLE {table_name}") + + +async def test_check_async_execution_fails(connection: Connection) -> None: + cursor = connection.cursor() + with raises(FireboltStructuredError): + await cursor.execute_async(f"MALFORMED QUERY") + with raises(FireboltError): + cursor.async_query_token diff --git a/tests/integration/dbapi/sync/V2/test_server_async.py b/tests/integration/dbapi/sync/V2/test_server_async.py index ea227a402e4..cb5b41eb12c 100644 --- a/tests/integration/dbapi/sync/V2/test_server_async.py +++ b/tests/integration/dbapi/sync/V2/test_server_async.py @@ -2,7 +2,10 @@ from random import randint from typing import Callable +from pytest import raises + from firebolt.db import Connection +from firebolt.utils.exception import FireboltError, FireboltStructuredError LONG_SELECT = "SELECT checksum(*) FROM GENERATE_SERIES(1, 2500000000)" # approx 3 sec @@ -70,3 +73,28 @@ def test_check_async_execution_from_another_connection( cursor.execute(f"DROP TABLE {table_name}") connection_1.close() connection_2.close() + + +def test_check_async_query_fails(connection: Connection) -> None: + cursor = connection.cursor() + rnd_suffix = str(randint(0, 1000)) + table_name = f"test_insert_async_{rnd_suffix}" + try: + cursor.execute(f"CREATE TABLE {table_name} (id LONG)") + cursor.execute_async(f"INSERT INTO {table_name} VALUES ('string')") + token = cursor.async_query_token + assert token is not None, "Async token was not returned" + # sleep for 2 sec to make sure the async query is completed + time.sleep(2) + assert connection.is_async_query_running(token) == False + assert connection.is_async_query_successful(token) == False + finally: + cursor.execute(f"DROP TABLE {table_name}") + + +def test_check_async_execution_fails(connection: Connection) -> None: + cursor = connection.cursor() + with raises(FireboltStructuredError): + cursor.execute_async(f"MALFORMED QUERY") + with raises(FireboltError): + cursor.async_query_token From 8af78f8173386a1bc4f91120442aaacf19f4386f Mon Sep 17 00:00:00 2001 From: ptiurin Date: Mon, 27 Jan 2025 14:13:35 +0000 Subject: [PATCH 19/19] Refactor check into another decorator --- src/firebolt/async_db/cursor.py | 3 +++ src/firebolt/common/base_cursor.py | 19 +++++++++++++++++++ src/firebolt/db/cursor.py | 2 ++ 3 files changed, 24 insertions(+) diff --git a/src/firebolt/async_db/cursor.py b/src/firebolt/async_db/cursor.py index 4769452fda0..c1c00e028b5 100644 --- a/src/firebolt/async_db/cursor.py +++ b/src/firebolt/async_db/cursor.py @@ -43,6 +43,7 @@ _parse_update_endpoint, _parse_update_parameters, _raise_if_internal_set_parameter, + async_not_allowed, check_not_closed, check_query_executed, ) @@ -388,6 +389,7 @@ async def nextset(self) -> None: # Iteration support @check_not_closed + @async_not_allowed @check_query_executed def __aiter__(self) -> Cursor: return self @@ -415,6 +417,7 @@ async def __aexit__( self.close() @check_not_closed + @async_not_allowed @check_query_executed async def __anext__(self) -> List[ColType]: row = await self.fetchone() diff --git a/src/firebolt/common/base_cursor.py b/src/firebolt/common/base_cursor.py index 1e50cad0faa..61b4844ec2a 100644 --- a/src/firebolt/common/base_cursor.py +++ b/src/firebolt/common/base_cursor.py @@ -159,6 +159,21 @@ def inner(self: BaseCursor, *args: Any, **kwargs: Any) -> Any: return inner +def async_not_allowed(func: Callable) -> Callable: + """ + (Decorator) ensure that fetch methods are not called on async queries. + """ + + @wraps(func) + def inner(self: BaseCursor, *args: Any, **kwargs: Any) -> Any: + if self._query_token: + # query_token is set only for async queries + raise MethodNotAllowedInAsyncError(method_name=func.__name__) + return func(self, *args, **kwargs) + + return inner + + class BaseCursor: __slots__ = ( "connection", @@ -272,6 +287,7 @@ def closed(self) -> bool: return self._state == CursorState.CLOSED @check_not_closed + @async_not_allowed @check_query_executed def nextset(self) -> Optional[bool]: """ @@ -423,6 +439,7 @@ def _get_next_range(self, size: int) -> Tuple[int, int]: ) @check_not_closed + @async_not_allowed @check_query_executed def fetchone(self) -> Optional[List[ColType]]: """Fetch the next row of a query result set.""" @@ -436,6 +453,7 @@ def fetchone(self) -> Optional[List[ColType]]: return result @check_not_closed + @async_not_allowed @check_query_executed def fetchmany(self, size: Optional[int] = None) -> List[List[ColType]]: """ @@ -451,6 +469,7 @@ def fetchmany(self, size: Optional[int] = None) -> List[List[ColType]]: return result @check_not_closed + @async_not_allowed @check_query_executed def fetchall(self) -> List[List[ColType]]: """Fetch all remaining rows of a query result.""" diff --git a/src/firebolt/db/cursor.py b/src/firebolt/db/cursor.py index 1dbbc4d971f..c4a1028a2e5 100644 --- a/src/firebolt/db/cursor.py +++ b/src/firebolt/db/cursor.py @@ -41,6 +41,7 @@ _parse_update_endpoint, _parse_update_parameters, _raise_if_internal_set_parameter, + async_not_allowed, check_not_closed, check_query_executed, ) @@ -358,6 +359,7 @@ def is_engine_running(self, engine_url: str) -> bool: # Iteration support @check_not_closed + @async_not_allowed @check_query_executed def __iter__(self) -> Generator[List[ColType], None, None]: while True: