diff --git a/docsrc/Connecting_and_queries.rst b/docsrc/Connecting_and_queries.rst index 04cef199a9a..406ea953032 100644 --- a/docsrc/Connecting_and_queries.rst +++ b/docsrc/Connecting_and_queries.rst @@ -594,6 +594,73 @@ load on both server and client machines can be controlled. A suggested way is to run(run_multiple_queries()) +Server-side asynchronous query execution +========================================== +Firebolt supports server-side asynchronous query execution. This feature allows you to run +queries in the background and fetch the results later. This is especially useful for long-running +queries that you don't want to wait for or maintain a persistent connection to the server. + +This feature is not to be confused with the Python SDK's asynchronous functionality, which is +described in the :ref:`Asynchronous query execution ` section, +used to write concurrent code. Server-side asynchronous query execution is a feature of the +Firebolt engine itself. + +Submitting an asynchronous query +-------------------------------- + +Use :py:meth:`firebolt.db.cursor.Cursor.execute_async` method to run query without maintaing a persistent connection. +This method will return immediately, and the query will be executed in the background. Return value +of execute_async is -1, which is the rowcount for queries where it's not applicable. +`cursor.async_query_token` attribute will contain a token that can be used to monitor the query status. + +:: + + # Synchronous execution + cursor.execute("CREATE TABLE my_table (id INT, name TEXT, date_value DATE)") + + # Asynchronous execution + cursor.execute_async("INSERT INTO my_table VALUES (5, 'egg', '2022-01-01')") + token = cursor.async_query_token + +Trying to access `async_query_token` before calling `execute_async` will raise an exception. + +.. note:: + Multiple-statement queries are not supported for asynchronous queries. However, you can run each statement + separately using multiple `execute_async` calls. + +.. note:: + Fetching data via SELECT is not supported and will raise an exception. execute_async is best suited for DML queries. + +Monitoring the query status +---------------------------- + +To check the async query status you need to retrieve the token of the query. The token is a unique +identifier for the query and can be used to fetch the query status. You can store this token +outside of the current process and use it later to check the query status. :ref:`Connection ` object +has two methods to check the query status: :py:meth:`firebolt.db.connection.Connection.is_async_query_running` and +:py:meth:`firebolt.db.connection.Connection.is_async_query_successful`.`is_async_query_running` will return True +if the query is still running, and False otherwise. `is_async_query_successful` will return True if the query +has finished successfully, None if query is still running and False if the query has failed. + +:: + + while(connection.is_async_query_running(token)): + print("Query is still running") + time.sleep(1) + print("Query has finished") + + success = connection.is_async_query_successful(token) + # success is None if the query is still running + if success is None: + # we should not reach this point since we've waited for is_async_query_running + raise Exception("The query is still running, use is_async_query_running to check the status") + + if success: + print("Query was successful") + else: + print("Query failed") + + Thread safety ============================== diff --git a/src/firebolt/async_db/connection.py b/src/firebolt/async_db/connection.py index e98b7d1e237..49a31dc6650 100644 --- a/src/firebolt/async_db/connection.py +++ b/src/firebolt/async_db/connection.py @@ -10,10 +10,19 @@ from firebolt.client import DEFAULT_API_URL from firebolt.client.auth import Auth from firebolt.client.client import AsyncClient, AsyncClientV1, AsyncClientV2 -from firebolt.common.base_connection import BaseConnection +from firebolt.common.base_connection import ( + ASYNC_QUERY_STATUS_REQUEST, + ASYNC_QUERY_STATUS_RUNNING, + ASYNC_QUERY_STATUS_SUCCESSFUL, + BaseConnection, +) from firebolt.common.cache import _firebolt_system_engine_cache from firebolt.common.constants import DEFAULT_TIMEOUT_SECONDS -from firebolt.utils.exception import ConfigurationError, ConnectionClosedError +from firebolt.utils.exception import ( + ConfigurationError, + ConnectionClosedError, + FireboltError, +) from firebolt.utils.usage_tracker import get_user_agent_header from firebolt.utils.util import fix_url_schema, validate_engine_name_and_url_v1 @@ -63,10 +72,9 @@ def __init__( api_endpoint: str, init_parameters: Optional[Dict[str, Any]] = None, ): - super().__init__() + super().__init__(cursor_type) self.api_endpoint = api_endpoint self.engine_url = engine_url - self.cursor_type = cursor_type self._cursors: List[Cursor] = [] self._client = client self.init_parameters = init_parameters or {} @@ -81,6 +89,50 @@ def cursor(self, **kwargs: Any) -> Cursor: self._cursors.append(c) return c + # Server-side async methods + async def _get_async_query_status(self, token: str) -> str: + if self.cursor_type != CursorV2: + raise FireboltError( + "This method is only supported for connection with service account." + ) + cursor = self.cursor() + await cursor.execute(ASYNC_QUERY_STATUS_REQUEST.format(token=token)) + result = await cursor.fetchone() + if cursor.rowcount != 1 or not result: + raise FireboltError("Unexpected result from async query status request.") + columns = cursor.description + result_dict = dict(zip([column.name for column in columns], result)) + return str(result_dict.get("status")) + + async def is_async_query_running(self, token: str) -> bool: + """ + Check if an async query is still running. + + Args: + token: Async query token. Can be obtained from Cursor.async_query_token. + + Returns: + bool: True if async query is still running, False otherwise + """ + status = await self._get_async_query_status(token) + return status == ASYNC_QUERY_STATUS_RUNNING + + async def is_async_query_successful(self, token: str) -> Optional[bool]: + """ + Check if an async query has finished and was successful. + + Args: + token: Async query token. Can be obtained from Cursor.async_query_token. + + Returns: + bool: None if the query is still running, True if successful, + False otherwise + """ + status = await self._get_async_query_status(token) + if status == ASYNC_QUERY_STATUS_RUNNING: + return None + return status == ASYNC_QUERY_STATUS_SUCCESSFUL + # Context manager support async def __aenter__(self) -> Connection: if self.closed: diff --git a/src/firebolt/async_db/cursor.py b/src/firebolt/async_db/cursor.py index 0c0385169b2..c1c00e028b5 100644 --- a/src/firebolt/async_db/cursor.py +++ b/src/firebolt/async_db/cursor.py @@ -8,6 +8,7 @@ from typing import ( TYPE_CHECKING, Any, + Dict, Iterator, List, Optional, @@ -39,16 +40,18 @@ UPDATE_PARAMETERS_HEADER, BaseCursor, CursorState, - RowSet, _parse_update_endpoint, _parse_update_parameters, _raise_if_internal_set_parameter, + async_not_allowed, check_not_closed, check_query_executed, ) from firebolt.utils.exception import ( EngineNotRunningError, FireboltDatabaseError, + FireboltError, + NotSupportedError, OperationalError, ProgrammingError, QueryTimeoutError, @@ -186,53 +189,93 @@ async def _parse_response_headers(self, headers: Headers) -> None: param_dict = _parse_update_parameters(headers.get(UPDATE_PARAMETERS_HEADER)) self._update_set_parameters(param_dict) + @abstractmethod + async def execute_async( + self, + query: str, + parameters: Optional[Sequence[ParameterType]] = None, + skip_parsing: bool = False, + ) -> int: + """Execute a database query without maintaining a connection.""" + ... + async def _do_execute( self, raw_query: str, parameters: Sequence[Sequence[ParameterType]], skip_parsing: bool = False, timeout: Optional[float] = None, + async_execution: bool = False, ) -> None: self._reset() - # Allow users to manually skip parsing for performance improvement. queries: List[Union[SetParameter, str]] = ( [raw_query] if skip_parsing else split_format_sql(raw_query, parameters) ) timeout_controller = TimeoutController(timeout) + + if len(queries) > 1 and async_execution: + raise FireboltError( + "Server side async does not support multi-statement queries" + ) try: for query in queries: - start_time = time.time() - Cursor._log_query(query) - timeout_controller.raise_if_timeout() - - if isinstance(query, SetParameter): - row_set: RowSet = (-1, None, None, None) - await self._validate_set_parameter( - query, timeout_controller.remaining() - ) - else: - resp = await self._api_request( - query, - {"output_format": JSON_OUTPUT_FORMAT}, - timeout=timeout_controller.remaining(), - ) - await self._raise_if_error(resp) - await self._parse_response_headers(resp.headers) - row_set = self._row_set_from_response(resp) - - self._append_row_set(row_set) - - logger.info( - f"Query fetched {self.rowcount} rows in" - f" {time.time() - start_time} seconds." + await self._execute_single_query( + query, timeout_controller, async_execution ) - self._state = CursorState.DONE - except Exception: self._state = CursorState.ERROR raise + async def _execute_single_query( + self, + query: Union[SetParameter, str], + timeout_controller: TimeoutController, + async_execution: bool, + ) -> None: + start_time = time.time() + Cursor._log_query(query) + timeout_controller.raise_if_timeout() + + if isinstance(query, SetParameter): + if async_execution: + raise FireboltError( + "Server side async does not support set statements, " + "please use execute to set this parameter" + ) + await self._validate_set_parameter(query, timeout_controller.remaining()) + else: + await self._handle_query_execution( + query, timeout_controller, async_execution + ) + + if not async_execution: + logger.info( + f"Query fetched {self.rowcount} rows in" + f" {time.time() - start_time} seconds." + ) + else: + logger.info("Query submitted for async execution.") + + async def _handle_query_execution( + self, query: str, timeout_controller: TimeoutController, async_execution: bool + ) -> None: + query_params: Dict[str, Any] = {"output_format": JSON_OUTPUT_FORMAT} + if async_execution: + query_params["async"] = True + resp = await self._api_request( + query, + query_params, + timeout=timeout_controller.remaining(), + ) + await self._raise_if_error(resp) + if async_execution: + self._parse_async_response(resp) + else: + await self._parse_response_headers(resp.headers) + row_set = self._row_set_from_response(resp) + self._append_row_set(row_set) + @check_not_closed async def execute( self, @@ -346,6 +389,7 @@ async def nextset(self) -> None: # Iteration support @check_not_closed + @async_not_allowed @check_query_executed def __aiter__(self) -> Cursor: return self @@ -373,6 +417,7 @@ async def __aexit__( self.close() @check_not_closed + @async_not_allowed @check_query_executed async def __anext__(self) -> List[ColType]: row = await self.fetchone() @@ -392,6 +437,47 @@ def __init__( assert isinstance(client, AsyncClientV2) super().__init__(*args, client=client, connection=connection, **kwargs) + @check_not_closed + async def execute_async( + self, + query: str, + parameters: Optional[Sequence[ParameterType]] = None, + skip_parsing: bool = False, + ) -> int: + """ + Execute a database query without maintating a connection. + + Supported features: + Parameterized queries: placeholder characters ('?') are substituted + with values provided in `parameters`. Values are formatted to + be properly recognized by database and to exclude SQL injection. + + Not supported: + Multi-statement queries: multiple statements, provided in a single query + and separated by semicolon. + SET statements: to provide additional query execution parameters, execute + `SET param=value` statement before it. Use `execute` method to set + parameters. + + Args: + query (str): SQL query to execute + parameters (Optional[Sequence[ParameterType]]): A sequence of substitution + parameters. Used to replace '?' placeholders inside a query with + actual values + skip_parsing (bool): Flag to disable query parsing. This will + disable parameterized queries while potentially improving performance + + Returns: + int: Always returns -1, as async execution does not return row count. + """ + await self._do_execute( + query, + [parameters] if parameters else [], + skip_parsing, + async_execution=True, + ) + return -1 + async def is_db_available(self, database_name: str) -> bool: """ Verify that the database exists. @@ -468,3 +554,13 @@ async def _filter_request(self, endpoint: str, filters: dict) -> Response: ) resp.raise_for_status() return resp + + async def execute_async( + self, + query: str, + parameters: Optional[Sequence[ParameterType]] = None, + skip_parsing: bool = False, + ) -> int: + raise NotSupportedError( + "Async execution is not supported in this version " " of Firebolt." + ) diff --git a/src/firebolt/client/constants.py b/src/firebolt/client/constants.py index 78d23c1bfa8..f5860e83d23 100644 --- a/src/firebolt/client/constants.py +++ b/src/firebolt/client/constants.py @@ -5,7 +5,7 @@ DEFAULT_API_URL: str = "api.app.firebolt.io" PROTOCOL_VERSION_HEADER_NAME = "Firebolt-Protocol-Version" -PROTOCOL_VERSION: str = "2.1" +PROTOCOL_VERSION: str = "2.3" _REQUEST_ERRORS: Tuple[Type, ...] = ( HTTPError, InvalidURL, diff --git a/src/firebolt/common/base_connection.py b/src/firebolt/common/base_connection.py index 1d47374015c..80be32d4898 100644 --- a/src/firebolt/common/base_connection.py +++ b/src/firebolt/common/base_connection.py @@ -1,10 +1,15 @@ -from typing import Any, List +from typing import Any, List, Type from firebolt.utils.exception import ConnectionClosedError +ASYNC_QUERY_STATUS_RUNNING = "RUNNING" +ASYNC_QUERY_STATUS_SUCCESSFUL = "ENDED_SUCCESSFULLY" +ASYNC_QUERY_STATUS_REQUEST = "CALL fb_GetAsyncStatus('{token}')" + class BaseConnection: - def __init__(self) -> None: + def __init__(self, cursor_type: Type) -> None: + self.cursor_type = cursor_type self._cursors: List[Any] = [] self._is_closed = False diff --git a/src/firebolt/common/base_cursor.py b/src/firebolt/common/base_cursor.py index d521df375a2..61b4844ec2a 100644 --- a/src/firebolt/common/base_cursor.py +++ b/src/firebolt/common/base_cursor.py @@ -22,6 +22,8 @@ ConfigurationError, CursorClosedError, DataError, + FireboltError, + MethodNotAllowedInAsyncError, QueryNotRunError, ) from firebolt.utils.util import Timer, fix_url_schema @@ -85,6 +87,13 @@ def _raise_if_internal_set_parameter(parameter: SetParameter) -> None: ) +@dataclass +class AsyncResponse: + token: str + message: str + monitorSql: str + + @dataclass class Statistics: """ @@ -142,6 +151,24 @@ def check_query_executed(func: Callable) -> Callable: def inner(self: BaseCursor, *args: Any, **kwargs: Any) -> Any: if self._state == CursorState.NONE: raise QueryNotRunError(method_name=func.__name__) + if self._query_token: + # query_token is set only for async queries + raise MethodNotAllowedInAsyncError(method_name=func.__name__) + return func(self, *args, **kwargs) + + return inner + + +def async_not_allowed(func: Callable) -> Callable: + """ + (Decorator) ensure that fetch methods are not called on async queries. + """ + + @wraps(func) + def inner(self: BaseCursor, *args: Any, **kwargs: Any) -> Any: + if self._query_token: + # query_token is set only for async queries + raise MethodNotAllowedInAsyncError(method_name=func.__name__) return func(self, *args, **kwargs) return inner @@ -163,6 +190,7 @@ class BaseCursor: "_next_set_idx", "_set_parameters", "_query_id", + "_query_token", "engine_url", ) @@ -184,6 +212,7 @@ def __init__(self, *args: Any, **kwargs: Any) -> None: self._idx = 0 self._next_set_idx = 0 self._query_id = "" + self._query_token = "" self._reset() @property @@ -229,6 +258,15 @@ def query_id(self) -> str: """The query id of a query executed asynchronously.""" return self._query_id + @property + def async_query_token(self) -> str: + """The query token of a query executed asynchronously.""" + if not self._query_token: + raise FireboltError( + "No async query was executed or query was not an async." + ) + return self._query_token + @property def arraysize(self) -> int: """Default number of rows returned by fetchmany.""" @@ -249,6 +287,7 @@ def closed(self) -> bool: return self._state == CursorState.CLOSED @check_not_closed + @async_not_allowed @check_query_executed def nextset(self) -> Optional[bool]: """ @@ -290,6 +329,7 @@ def _reset(self) -> None: self._row_sets = [] self._next_set_idx = 0 self._query_id = "" + self._query_token = "" def _update_set_parameters(self, parameters: Dict[str, Any]) -> None: # Split parameters into immutable and user parameters @@ -334,6 +374,11 @@ def engine_name(self) -> str: return self.parameters["engine"] return URL(self.engine_url).host.split(".")[0].replace("-", "_") + def _parse_async_response(self, response: Response) -> None: + """Handle async response from the server.""" + async_response = AsyncResponse(**response.json()) + self._query_token = async_response.token + def _row_set_from_response(self, response: Response) -> RowSet: """Fetch information about executed query from http response.""" @@ -394,6 +439,7 @@ def _get_next_range(self, size: int) -> Tuple[int, int]: ) @check_not_closed + @async_not_allowed @check_query_executed def fetchone(self) -> Optional[List[ColType]]: """Fetch the next row of a query result set.""" @@ -407,6 +453,7 @@ def fetchone(self) -> Optional[List[ColType]]: return result @check_not_closed + @async_not_allowed @check_query_executed def fetchmany(self, size: Optional[int] = None) -> List[List[ColType]]: """ @@ -422,6 +469,7 @@ def fetchmany(self, size: Optional[int] = None) -> List[List[ColType]]: return result @check_not_closed + @async_not_allowed @check_query_executed def fetchall(self) -> List[List[ColType]]: """Fetch all remaining rows of a query result.""" diff --git a/src/firebolt/db/connection.py b/src/firebolt/db/connection.py index 9914e3174a8..aeab156dd9d 100644 --- a/src/firebolt/db/connection.py +++ b/src/firebolt/db/connection.py @@ -9,12 +9,21 @@ from firebolt.client import DEFAULT_API_URL, Client, ClientV1, ClientV2 from firebolt.client.auth import Auth -from firebolt.common.base_connection import BaseConnection +from firebolt.common.base_connection import ( + ASYNC_QUERY_STATUS_REQUEST, + ASYNC_QUERY_STATUS_RUNNING, + ASYNC_QUERY_STATUS_SUCCESSFUL, + BaseConnection, +) from firebolt.common.cache import _firebolt_system_engine_cache from firebolt.common.constants import DEFAULT_TIMEOUT_SECONDS from firebolt.db.cursor import Cursor, CursorV1, CursorV2 from firebolt.db.util import _get_system_engine_url_and_params -from firebolt.utils.exception import ConfigurationError, ConnectionClosedError +from firebolt.utils.exception import ( + ConfigurationError, + ConnectionClosedError, + FireboltError, +) from firebolt.utils.usage_tracker import get_user_agent_header from firebolt.utils.util import fix_url_schema, validate_engine_name_and_url_v1 @@ -181,10 +190,9 @@ def __init__( api_endpoint: str = DEFAULT_API_URL, init_parameters: Optional[Dict[str, Any]] = None, ): - super().__init__() + super().__init__(cursor_type) self.api_endpoint = api_endpoint self.engine_url = engine_url - self.cursor_type = cursor_type self._cursors: List[Cursor] = [] self._client = client self.init_parameters = init_parameters or {} @@ -218,6 +226,49 @@ def close(self) -> None: self._client.close() self._is_closed = True + # Server-side async methods + def _get_async_query_status(self, token: str) -> str: + if self.cursor_type != CursorV2: + raise FireboltError( + "This method is only supported for connection with service account." + ) + cursor = self.cursor() + cursor.execute(ASYNC_QUERY_STATUS_REQUEST.format(token=token)) + result = cursor.fetchone() + if cursor.rowcount != 1 or not result: + raise FireboltError("Unexpected result from async query status request.") + columns = cursor.description + result_dict = dict(zip([column.name for column in columns], result)) + return result_dict["status"] + + def is_async_query_running(self, token: str) -> bool: + """ + Check if an async query is still running. + + Args: + token: Async query token. Can be obtained from Cursor.async_query_token. + + Returns: + bool: True if async query is still running, False otherwise + """ + return self._get_async_query_status(token) == ASYNC_QUERY_STATUS_RUNNING + + def is_async_query_successful(self, token: str) -> Optional[bool]: + """ + Check if an async query has finished and was successful. + + Args: + token: Async query token. Can be obtained from Cursor.async_query_token. + + Returns: + bool: None if the query is still running, True if successful, + False otherwise + """ + status = self._get_async_query_status(token) + if status == ASYNC_QUERY_STATUS_RUNNING: + return None + return status == ASYNC_QUERY_STATUS_SUCCESSFUL + # Context manager support def __enter__(self) -> Connection: if self.closed: diff --git a/src/firebolt/db/cursor.py b/src/firebolt/db/cursor.py index 86943f9ae8a..c4a1028a2e5 100644 --- a/src/firebolt/db/cursor.py +++ b/src/firebolt/db/cursor.py @@ -6,6 +6,7 @@ from typing import ( TYPE_CHECKING, Any, + Dict, Generator, List, Optional, @@ -37,16 +38,18 @@ UPDATE_PARAMETERS_HEADER, BaseCursor, CursorState, - RowSet, _parse_update_endpoint, _parse_update_parameters, _raise_if_internal_set_parameter, + async_not_allowed, check_not_closed, check_query_executed, ) from firebolt.utils.exception import ( EngineNotRunningError, FireboltDatabaseError, + FireboltError, + NotSupportedError, OperationalError, ProgrammingError, QueryTimeoutError, @@ -184,52 +187,89 @@ def _parse_response_headers(self, headers: Headers) -> None: param_dict = _parse_update_parameters(headers.get(UPDATE_PARAMETERS_HEADER)) self._update_set_parameters(param_dict) + @abstractmethod + def execute_async( + self, + query: str, + parameters: Optional[Sequence[ParameterType]] = None, + skip_parsing: bool = False, + ) -> int: + """Execute a database query without maintaining a connection.""" + ... + def _do_execute( self, raw_query: str, parameters: Sequence[Sequence[ParameterType]], skip_parsing: bool = False, timeout: Optional[float] = None, + async_execution: bool = False, ) -> None: self._reset() - # Allow users to manually skip parsing for performance improvement. queries: List[Union[SetParameter, str]] = ( [raw_query] if skip_parsing else split_format_sql(raw_query, parameters) ) timeout_controller = TimeoutController(timeout) + if len(queries) > 1 and async_execution: + raise FireboltError( + "Server side async does not support multi-statement queries" + ) try: for query in queries: - start_time = time.time() - Cursor._log_query(query) - timeout_controller.raise_if_timeout() - - if isinstance(query, SetParameter): - row_set: RowSet = (-1, None, None, None) - self._validate_set_parameter(query, timeout_controller.remaining()) - else: - resp = self._api_request( - query, - {"output_format": JSON_OUTPUT_FORMAT}, - timeout=timeout_controller.remaining(), - ) - self._raise_if_error(resp) - self._parse_response_headers(resp.headers) - row_set = self._row_set_from_response(resp) - - self._append_row_set(row_set) - - logger.info( - f"Query fetched {self.rowcount} rows in" - f" {time.time() - start_time} seconds." - ) - + self._execute_single_query(query, timeout_controller, async_execution) self._state = CursorState.DONE - except Exception: self._state = CursorState.ERROR raise + def _execute_single_query( + self, + query: Union[SetParameter, str], + timeout_controller: TimeoutController, + async_execution: bool, + ) -> None: + start_time = time.time() + Cursor._log_query(query) + timeout_controller.raise_if_timeout() + + if isinstance(query, SetParameter): + if async_execution: + raise FireboltError( + "Server side async does not support set statements, " + "please use execute to set this parameter" + ) + self._validate_set_parameter(query, timeout_controller.remaining()) + else: + self._handle_query_execution(query, timeout_controller, async_execution) + + if not async_execution: + logger.info( + f"Query fetched {self.rowcount} rows in" + f" {time.time() - start_time} seconds." + ) + else: + logger.info("Query submitted for async execution.") + + def _handle_query_execution( + self, query: str, timeout_controller: TimeoutController, async_execution: bool + ) -> None: + query_params: Dict[str, Any] = {"output_format": JSON_OUTPUT_FORMAT} + if async_execution: + query_params["async"] = True + resp = self._api_request( + query, + query_params, + timeout=timeout_controller.remaining(), + ) + self._raise_if_error(resp) + if async_execution: + self._parse_async_response(resp) + else: + self._parse_response_headers(resp.headers) + row_set = self._row_set_from_response(resp) + self._append_row_set(row_set) + @check_not_closed def execute( self, @@ -319,6 +359,7 @@ def is_engine_running(self, engine_url: str) -> bool: # Iteration support @check_not_closed + @async_not_allowed @check_query_executed def __iter__(self) -> Generator[List[ColType], None, None]: while True: @@ -364,6 +405,47 @@ def is_engine_running(self, engine_url: str) -> bool: # so we can't check if it's running return True + @check_not_closed + def execute_async( + self, + query: str, + parameters: Optional[Sequence[ParameterType]] = None, + skip_parsing: bool = False, + ) -> int: + """ + Execute a database query without maintating a connection. + + Supported features: + Parameterized queries: placeholder characters ('?') are substituted + with values provided in `parameters`. Values are formatted to + be properly recognized by database and to exclude SQL injection. + + Not supported: + Multi-statement queries: multiple statements, provided in a single query + and separated by semicolon. + SET statements: to provide additional query execution parameters, execute + `SET param=value` statement before it. Use `execute` method to set + parameters. + + Args: + query (str): SQL query to execute + parameters (Optional[Sequence[ParameterType]]): A sequence of substitution + parameters. Used to replace '?' placeholders inside a query with + actual values + skip_parsing (bool): Flag to disable query parsing. This will + disable parameterized queries while potentially improving performance + + Returns: + int: Always returns -1, as async execution does not return row count. + """ + self._do_execute( + query, + [parameters] if parameters else [], + skip_parsing, + async_execution=True, + ) + return -1 + class CursorV1(Cursor): def __init__( @@ -412,3 +494,13 @@ def _filter_request(self, endpoint: str, filters: dict) -> Response: ) resp.raise_for_status() return resp + + def execute_async( + self, + query: str, + parameters: Optional[Sequence[ParameterType]] = None, + skip_parsing: bool = False, + ) -> int: + raise NotSupportedError( + "Async execution is not supported in this version " " of Firebolt." + ) diff --git a/src/firebolt/utils/exception.py b/src/firebolt/utils/exception.py index c03cb344ab5..457ee8ec16c 100644 --- a/src/firebolt/utils/exception.py +++ b/src/firebolt/utils/exception.py @@ -321,3 +321,17 @@ class QueryTimeoutError(FireboltError, TimeoutError): def __init__(self, message="Query execution timed out."): # type: ignore super().__init__(message) + + +class MethodNotAllowedInAsyncError(FireboltError): + """Method not allowed. + + Exception raised when the method is not allowed. + """ + + def __init__(self, method_name: str): + super().__init__( + f"Method {method_name} not allowed for an async query." + " Please get the token and use the async query API to get the status." + ) + self.method_name = method_name diff --git a/tests/integration/dbapi/async/V2/conftest.py b/tests/integration/dbapi/async/V2/conftest.py index edfd6f96a4d..a379f0c667f 100644 --- a/tests/integration/dbapi/async/V2/conftest.py +++ b/tests/integration/dbapi/async/V2/conftest.py @@ -1,6 +1,6 @@ import random import string -from typing import Tuple +from typing import Any, Callable, Tuple from pytest import fixture @@ -28,6 +28,27 @@ async def connection( yield connection +@fixture +async def connection_factory( + engine_name: str, + database_name: str, + auth: Auth, + account_name: str, + api_endpoint: str, +) -> Callable[..., Connection]: + async def factory(**kwargs: Any) -> Connection: + return await connect( + engine_name=engine_name, + database=database_name, + auth=auth, + account_name=account_name, + api_endpoint=api_endpoint, + **kwargs, + ) + + return factory + + @fixture async def connection_no_db( engine_name: str, diff --git a/tests/integration/dbapi/async/V2/test_server_async.py b/tests/integration/dbapi/async/V2/test_server_async.py new file mode 100644 index 00000000000..2875fc07778 --- /dev/null +++ b/tests/integration/dbapi/async/V2/test_server_async.py @@ -0,0 +1,104 @@ +import time +from random import randint +from typing import Callable + +from pytest import raises + +from firebolt.db import Connection +from firebolt.utils.exception import FireboltError, FireboltStructuredError + +LONG_SELECT = "SELECT checksum(*) FROM GENERATE_SERIES(1, 2500000000)" # approx 3 sec + + +async def test_insert_async(connection: Connection) -> None: + cursor = connection.cursor() + rnd_suffix = str(randint(0, 1000)) + table_name = f"test_insert_async_{rnd_suffix}" + try: + await cursor.execute(f"CREATE TABLE {table_name} (id INT, name TEXT)") + await cursor.execute_async( + f"INSERT INTO {table_name} (id, name) VALUES (1, 'test')" + ) + token = cursor.async_query_token + assert token is not None, "Async token was not returned" + # sleep for 2 sec to make sure the async query is completed + time.sleep(2) + assert await connection.is_async_query_running(token) == False + assert await connection.is_async_query_successful(token) == True + # Verify the result + cursor = connection.cursor() + await cursor.execute(f"SELECT * FROM {table_name}") + result = await cursor.fetchall() + assert result == [[1, "test"]] + finally: + await cursor.execute(f"DROP TABLE {table_name}") + + +async def test_insert_async_running(connection: Connection) -> None: + cursor = connection.cursor() + rnd_suffix = str(randint(0, 1000)) + table_name = f"test_insert_async_{rnd_suffix}" + try: + await cursor.execute(f"CREATE TABLE {table_name} (id LONG)") + await cursor.execute_async(f"INSERT INTO {table_name} {LONG_SELECT}") + token = cursor.async_query_token + assert token is not None, "Async token was not returned" + assert await connection.is_async_query_running(token) == True + assert await connection.is_async_query_successful(token) is None + finally: + await cursor.execute(f"DROP TABLE {table_name}") + + +async def test_check_async_execution_from_another_connection( + connection_factory: Callable[..., Connection] +) -> None: + connection_1 = await connection_factory() + connection_2 = await connection_factory() + cursor = connection_1.cursor() + rnd_suffix = str(randint(0, 1000)) + table_name = f"test_insert_async_{rnd_suffix}" + try: + await cursor.execute(f"CREATE TABLE {table_name} (id INT, name TEXT)") + await cursor.execute_async( + f"INSERT INTO {table_name} (id, name) VALUES (1, 'test')" + ) + token = cursor.async_query_token + assert token is not None, "Async token was not returned" + # sleep for 2 sec to make sure the async query is completed + time.sleep(2) + assert await connection_2.is_async_query_running(token) == False + assert await connection_2.is_async_query_successful(token) == True + # Verify the result + cursor = connection_2.cursor() + await cursor.execute(f"SELECT * FROM {table_name}") + result = await cursor.fetchall() + assert result == [[1, "test"]] + finally: + await cursor.execute(f"DROP TABLE {table_name}") + await connection_1.aclose() + await connection_2.aclose() + + +async def test_check_async_query_fails(connection: Connection) -> None: + cursor = connection.cursor() + rnd_suffix = str(randint(0, 1000)) + table_name = f"test_insert_async_{rnd_suffix}" + try: + await cursor.execute(f"CREATE TABLE {table_name} (id LONG)") + await cursor.execute_async(f"INSERT INTO {table_name} VALUES ('string')") + token = cursor.async_query_token + assert token is not None, "Async token was not returned" + # sleep for 2 sec to make sure the async query is completed + time.sleep(2) + assert await connection.is_async_query_running(token) == False + assert await connection.is_async_query_successful(token) == False + finally: + await cursor.execute(f"DROP TABLE {table_name}") + + +async def test_check_async_execution_fails(connection: Connection) -> None: + cursor = connection.cursor() + with raises(FireboltStructuredError): + await cursor.execute_async(f"MALFORMED QUERY") + with raises(FireboltError): + cursor.async_query_token diff --git a/tests/integration/dbapi/sync/V2/conftest.py b/tests/integration/dbapi/sync/V2/conftest.py index 96a3c19eb5c..a116657d4f1 100644 --- a/tests/integration/dbapi/sync/V2/conftest.py +++ b/tests/integration/dbapi/sync/V2/conftest.py @@ -1,6 +1,6 @@ import random import string -from typing import Tuple +from typing import Any, Callable, Tuple from pytest import fixture @@ -28,6 +28,27 @@ def connection( yield connection +@fixture +def connection_factory( + engine_name: str, + database_name: str, + auth: Auth, + account_name: str, + api_endpoint: str, +) -> Callable[..., Connection]: + def factory(**kwargs: Any) -> Connection: + return connect( + engine_name=engine_name, + database=database_name, + auth=auth, + account_name=account_name, + api_endpoint=api_endpoint, + **kwargs, + ) + + return factory + + @fixture def connection_no_db( engine_name: str, diff --git a/tests/integration/dbapi/sync/V2/test_server_async.py b/tests/integration/dbapi/sync/V2/test_server_async.py new file mode 100644 index 00000000000..cb5b41eb12c --- /dev/null +++ b/tests/integration/dbapi/sync/V2/test_server_async.py @@ -0,0 +1,100 @@ +import time +from random import randint +from typing import Callable + +from pytest import raises + +from firebolt.db import Connection +from firebolt.utils.exception import FireboltError, FireboltStructuredError + +LONG_SELECT = "SELECT checksum(*) FROM GENERATE_SERIES(1, 2500000000)" # approx 3 sec + + +def test_insert_async(connection: Connection) -> None: + cursor = connection.cursor() + rnd_suffix = str(randint(0, 1000)) + table_name = f"test_insert_async_{rnd_suffix}" + try: + cursor.execute(f"CREATE TABLE {table_name} (id INT, name TEXT)") + cursor.execute_async(f"INSERT INTO {table_name} (id, name) VALUES (1, 'test')") + token = cursor.async_query_token + assert token is not None, "Async token was not returned" + # sleep for 2 sec to make sure the async query is completed + time.sleep(2) + assert connection.is_async_query_running(token) == False + assert connection.is_async_query_successful(token) == True + # Verify the result + cursor = connection.cursor() + cursor.execute(f"SELECT * FROM {table_name}") + result = cursor.fetchall() + assert result == [[1, "test"]] + finally: + cursor.execute(f"DROP TABLE {table_name}") + + +def test_insert_async_running(connection: Connection) -> None: + cursor = connection.cursor() + rnd_suffix = str(randint(0, 1000)) + table_name = f"test_insert_async_{rnd_suffix}" + try: + cursor.execute(f"CREATE TABLE {table_name} (id LONG)") + cursor.execute_async(f"INSERT INTO {table_name} {LONG_SELECT}") + token = cursor.async_query_token + assert token is not None, "Async token was not returned" + assert connection.is_async_query_running(token) == True + assert connection.is_async_query_successful(token) is None + finally: + cursor.execute(f"DROP TABLE {table_name}") + + +def test_check_async_execution_from_another_connection( + connection_factory: Callable[..., Connection] +) -> None: + connection_1 = connection_factory() + connection_2 = connection_factory() + cursor = connection_1.cursor() + rnd_suffix = str(randint(0, 1000)) + table_name = f"test_insert_async_{rnd_suffix}" + try: + cursor.execute(f"CREATE TABLE {table_name} (id INT, name TEXT)") + cursor.execute_async(f"INSERT INTO {table_name} (id, name) VALUES (1, 'test')") + token = cursor.async_query_token + assert token is not None, "Async token was not returned" + # sleep for 2 sec to make sure the async query is completed + time.sleep(2) + assert connection_2.is_async_query_running(token) == False + assert connection_2.is_async_query_successful(token) == True + # Verify the result + cursor = connection_2.cursor() + cursor.execute(f"SELECT * FROM {table_name}") + result = cursor.fetchall() + assert result == [[1, "test"]] + finally: + cursor.execute(f"DROP TABLE {table_name}") + connection_1.close() + connection_2.close() + + +def test_check_async_query_fails(connection: Connection) -> None: + cursor = connection.cursor() + rnd_suffix = str(randint(0, 1000)) + table_name = f"test_insert_async_{rnd_suffix}" + try: + cursor.execute(f"CREATE TABLE {table_name} (id LONG)") + cursor.execute_async(f"INSERT INTO {table_name} VALUES ('string')") + token = cursor.async_query_token + assert token is not None, "Async token was not returned" + # sleep for 2 sec to make sure the async query is completed + time.sleep(2) + assert connection.is_async_query_running(token) == False + assert connection.is_async_query_successful(token) == False + finally: + cursor.execute(f"DROP TABLE {table_name}") + + +def test_check_async_execution_fails(connection: Connection) -> None: + cursor = connection.cursor() + with raises(FireboltStructuredError): + cursor.execute_async(f"MALFORMED QUERY") + with raises(FireboltError): + cursor.async_query_token diff --git a/tests/unit/V1/async_db/test_cursor.py b/tests/unit/V1/async_db/test_cursor.py index abea8007a6b..825044fe696 100644 --- a/tests/unit/V1/async_db/test_cursor.py +++ b/tests/unit/V1/async_db/test_cursor.py @@ -14,6 +14,7 @@ DataError, EngineNotRunningError, FireboltDatabaseError, + NotSupportedError, OperationalError, QueryNotRunError, ) @@ -677,3 +678,10 @@ async def test_disallowed_set_parameter(cursor: Cursor, parameter: str) -> None: e.value ), "invalid error" assert cursor._set_parameters == {}, "set parameters should not be updated" + + +async def test_cursor_execute_async_raises(cursor: Cursor) -> None: + """Test that calling execute_async raises an error.""" + with raises(NotSupportedError) as e: + await cursor.execute_async("select 1") + assert "Async execution is not supported" in str(e.value), "invalid error" diff --git a/tests/unit/V1/db/test_cursor.py b/tests/unit/V1/db/test_cursor.py index 4510f5a9710..9382977e8f6 100644 --- a/tests/unit/V1/db/test_cursor.py +++ b/tests/unit/V1/db/test_cursor.py @@ -12,6 +12,7 @@ ConfigurationError, CursorClosedError, DataError, + NotSupportedError, OperationalError, QueryNotRunError, ) @@ -623,3 +624,10 @@ def test_disallowed_set_parameter(cursor: Cursor, parameter: str) -> None: e.value ), "invalid error" assert cursor._set_parameters == {}, "set parameters should not be updated" + + +def test_cursor_execute_async_raises(cursor: Cursor) -> None: + """Test that calling execute_async raises an error.""" + with raises(NotSupportedError) as e: + cursor.execute_async("select 1") + assert "Async execution is not supported" in str(e.value), "invalid error" diff --git a/tests/unit/async_db/test_connection.py b/tests/unit/async_db/test_connection.py index 183eb67621c..801f9353962 100644 --- a/tests/unit/async_db/test_connection.py +++ b/tests/unit/async_db/test_connection.py @@ -1,4 +1,4 @@ -from typing import Callable, List +from typing import Callable, List, Optional, Tuple from unittest.mock import patch from pyfakefs.fake_filesystem_unittest import Patcher @@ -406,3 +406,89 @@ async def test_connect_no_user_agent( ) as connection: await connection.cursor().execute("select*") ut.assert_called_with([], []) + + +@mark.parametrize( + "server_status,expected_running,expected_success", + [ + ("RUNNING", True, None), + ("ENDED_SUCCESSFULLY", False, True), + ("FAILED", False, False), + ("CANCELLED", False, False), + ], +) +async def test_is_async_query_running_success( + db_name: str, + account_name: str, + engine_name: str, + auth: Auth, + api_endpoint: str, + httpx_mock: HTTPXMock, + query_url: str, + async_query_callback_factory: Callable, + async_query_data: List[List[ColType]], + async_query_meta: List[Tuple[str, str]], + mock_connection_flow: Callable, + server_status: str, + expected_running: bool, + expected_success: Optional[bool], +): + """Test is_async_query_running method""" + mock_connection_flow() + async_query_data[0][5] = server_status + async_query_status_running_callback = async_query_callback_factory( + async_query_data, async_query_meta + ) + + httpx_mock.add_callback( + async_query_status_running_callback, + url=query_url, + match_content="CALL fb_GetAsyncStatus('token')".encode("utf-8"), + ) + + async with await connect( + database=db_name, + auth=auth, + engine_name=engine_name, + account_name=account_name, + api_endpoint=api_endpoint, + ) as connection: + assert await connection.is_async_query_running("token") is expected_running + assert await connection.is_async_query_successful("token") is expected_success + + +async def test_async_query_status_unexpected_result( + db_name: str, + account_name: str, + engine_name: str, + auth: Auth, + api_endpoint: str, + httpx_mock: HTTPXMock, + query_url: str, + async_query_callback_factory: Callable, + async_query_meta: List[Tuple[str, str]], + mock_connection_flow: Callable, +): + """Test is_async_query_running method""" + mock_connection_flow() + async_query_status_running_callback = async_query_callback_factory( + [], async_query_meta + ) + + httpx_mock.add_callback( + async_query_status_running_callback, + url=query_url, + match_content="CALL fb_GetAsyncStatus('token')".encode("utf-8"), + ) + + async with await connect( + database=db_name, + auth=auth, + engine_name=engine_name, + account_name=account_name, + api_endpoint=api_endpoint, + ) as connection: + with raises(FireboltError): + await connection.is_async_query_running("token") + with raises(FireboltError): + await connection.is_async_query_successful("token") diff --git a/tests/unit/async_db/test_cursor.py b/tests/unit/async_db/test_cursor.py index 14ced6794d5..3adbafea2c5 100644 --- a/tests/unit/async_db/test_cursor.py +++ b/tests/unit/async_db/test_cursor.py @@ -13,6 +13,8 @@ ConfigurationError, CursorClosedError, DataError, + FireboltError, + MethodNotAllowedInAsyncError, OperationalError, ProgrammingError, QueryNotRunError, @@ -772,3 +774,83 @@ def long_query_callback(request: Request, **kwargs) -> Response: assert fast_executed is False, "fast query was not executed" httpx_mock.reset(False) + + +async def verify_async_fetch_not_allowed(cursor: Cursor): + with raises(MethodNotAllowedInAsyncError): + await cursor.fetchall() + with raises(MethodNotAllowedInAsyncError): + await cursor.fetchone() + with raises(MethodNotAllowedInAsyncError): + await cursor.fetchmany() + + +async def test_cursor_execute_async( + httpx_mock: HTTPXMock, + async_query_callback: Callable, + async_query_url: str, + cursor: Cursor, + async_token: str, +): + httpx_mock.add_callback(async_query_callback, url=async_query_url) + await cursor.execute_async("SELECT 2") + await verify_async_fetch_not_allowed(cursor) + assert cursor.async_query_token == async_token + assert cursor._state == CursorState.DONE + + +async def test_cursor_execute_async_multiple_queries( + cursor: Cursor, +): + with raises(FireboltError) as e: + await cursor.execute_async("SELECT 2; SELECT 3") + assert "does not support multi-statement" in str(e.value) + + +async def test_cursor_execute_async_parametrised_query( + httpx_mock: HTTPXMock, + async_query_callback: Callable, + async_query_url: str, + cursor: Cursor, + async_token: str, +): + httpx_mock.add_callback(async_query_callback, url=async_query_url) + await cursor.execute_async("SELECT 2 WHERE x = ?", [1]) + await verify_async_fetch_not_allowed(cursor) + assert cursor.async_query_token == async_token + assert cursor._state == CursorState.DONE + + +async def test_cursor_execute_async_skip_parsing( + httpx_mock: HTTPXMock, + async_query_callback: Callable, + async_query_url: str, + cursor: Cursor, + async_token: str, +): + httpx_mock.add_callback(async_query_callback, url=async_query_url) + await cursor.execute_async("SELECT 2; SELECT 3", skip_parsing=True) + await verify_async_fetch_not_allowed(cursor) + assert cursor.async_query_token == async_token + assert cursor._state == CursorState.DONE + + +async def test_cursor_execute_async_validate_set_parameters( + cursor: Cursor, +): + with raises(FireboltError) as e: + await cursor.execute_async("SET a = b") + assert "does not support set" in str(e.value) + + +async def test_cursor_execute_async_respects_api_errors( + httpx_mock: HTTPXMock, + async_query_url: str, + cursor: Cursor, +): + httpx_mock.add_callback( + lambda *args, **kwargs: Response(status_code=codes.BAD_REQUEST), + url=async_query_url, + ) + with raises(HTTPStatusError): + await cursor.execute_async("SELECT 2") diff --git a/tests/unit/db/test_connection.py b/tests/unit/db/test_connection.py index d874079aff8..87c21e3aba7 100644 --- a/tests/unit/db/test_connection.py +++ b/tests/unit/db/test_connection.py @@ -1,6 +1,6 @@ import gc import warnings -from typing import Callable, List +from typing import Callable, List, Optional, Tuple from unittest.mock import patch from pyfakefs.fake_filesystem_unittest import Patcher @@ -424,3 +424,89 @@ def test_connect_no_user_agent( ) as connection: connection.cursor().execute("select*") ut.assert_called_with([], []) + + +@mark.parametrize( + "server_status,expected_running,expected_success", + [ + ("RUNNING", True, None), + ("ENDED_SUCCESSFULLY", False, True), + ("FAILED", False, False), + ("CANCELLED", False, False), + ], +) +def test_is_async_query_running_success( + db_name: str, + account_name: str, + engine_name: str, + auth: Auth, + api_endpoint: str, + httpx_mock: HTTPXMock, + query_url: str, + async_query_callback_factory: Callable, + async_query_data: List[List[ColType]], + async_query_meta: List[Tuple[str, str]], + mock_connection_flow: Callable, + server_status: str, + expected_running: bool, + expected_success: Optional[bool], +): + """Test is_async_query_running method""" + mock_connection_flow() + async_query_data[0][5] = server_status + async_query_status_running_callback = async_query_callback_factory( + async_query_data, async_query_meta + ) + + httpx_mock.add_callback( + async_query_status_running_callback, + url=query_url, + match_content="CALL fb_GetAsyncStatus('token')".encode("utf-8"), + ) + + with connect( + database=db_name, + auth=auth, + engine_name=engine_name, + account_name=account_name, + api_endpoint=api_endpoint, + ) as connection: + assert connection.is_async_query_running("token") is expected_running + assert connection.is_async_query_successful("token") is expected_success + + +def test_async_query_status_unexpected_result( + db_name: str, + account_name: str, + engine_name: str, + auth: Auth, + api_endpoint: str, + httpx_mock: HTTPXMock, + query_url: str, + async_query_callback_factory: Callable, + async_query_meta: List[Tuple[str, str]], + mock_connection_flow: Callable, +): + """Test is_async_query_running method""" + mock_connection_flow() + async_query_status_running_callback = async_query_callback_factory( + [], async_query_meta + ) + + httpx_mock.add_callback( + async_query_status_running_callback, + url=query_url, + match_content="CALL fb_GetAsyncStatus('token')".encode("utf-8"), + ) + + with connect( + database=db_name, + auth=auth, + engine_name=engine_name, + account_name=account_name, + api_endpoint=api_endpoint, + ) as connection: + with raises(FireboltError): + connection.is_async_query_running("token") + with raises(FireboltError): + connection.is_async_query_successful("token") diff --git a/tests/unit/db/test_cursor.py b/tests/unit/db/test_cursor.py index 4f4a5c60ea6..a7a9b2333d2 100644 --- a/tests/unit/db/test_cursor.py +++ b/tests/unit/db/test_cursor.py @@ -13,6 +13,8 @@ ConfigurationError, CursorClosedError, DataError, + FireboltError, + MethodNotAllowedInAsyncError, OperationalError, ProgrammingError, QueryNotRunError, @@ -757,3 +759,83 @@ def long_query_callback(request: Request, **kwargs) -> Response: assert fast_executed is False, "fast query was not executed" httpx_mock.reset(False) + + +def verify_async_fetch_not_allowed(cursor: Cursor): + with raises(MethodNotAllowedInAsyncError): + cursor.fetchall() + with raises(MethodNotAllowedInAsyncError): + cursor.fetchone() + with raises(MethodNotAllowedInAsyncError): + cursor.fetchmany() + + +def test_cursor_execute_async( + httpx_mock: HTTPXMock, + async_query_callback: Callable, + async_query_url: str, + cursor: Cursor, + async_token: str, +): + httpx_mock.add_callback(async_query_callback, url=async_query_url) + cursor.execute_async("SELECT 2") + verify_async_fetch_not_allowed(cursor) + assert cursor.async_query_token == async_token + assert cursor._state == CursorState.DONE + + +def test_cursor_execute_async_multiple_queries( + cursor: Cursor, +): + with raises(FireboltError) as e: + cursor.execute_async("SELECT 2; SELECT 3") + assert "does not support multi-statement" in str(e.value) + + +def test_cursor_execute_async_parametrised_query( + httpx_mock: HTTPXMock, + async_query_callback: Callable, + async_query_url: str, + cursor: Cursor, + async_token: str, +): + httpx_mock.add_callback(async_query_callback, url=async_query_url) + cursor.execute_async("SELECT 2 WHERE x = ?", [1]) + verify_async_fetch_not_allowed(cursor) + assert cursor.async_query_token == async_token + assert cursor._state == CursorState.DONE + + +def test_cursor_execute_async_skip_parsing( + httpx_mock: HTTPXMock, + async_query_callback: Callable, + async_query_url: str, + cursor: Cursor, + async_token: str, +): + httpx_mock.add_callback(async_query_callback, url=async_query_url) + cursor.execute_async("SELECT 2; SELECT 3", skip_parsing=True) + verify_async_fetch_not_allowed(cursor) + assert cursor.async_query_token == async_token + assert cursor._state == CursorState.DONE + + +def test_cursor_execute_async_validate_set_parameters( + cursor: Cursor, +): + with raises(FireboltError) as e: + cursor.execute_async("SET a = b") + assert "does not support set" in str(e.value) + + +def test_cursor_execute_async_respects_api_errors( + httpx_mock: HTTPXMock, + async_query_url: str, + cursor: Cursor, +): + httpx_mock.add_callback( + lambda *args, **kwargs: Response(status_code=codes.BAD_REQUEST), + url=async_query_url, + ) + with raises(HTTPStatusError): + cursor.execute_async("SELECT 2") diff --git a/tests/unit/db_conftest.py b/tests/unit/db_conftest.py index f1e20b9c495..cb58fee7d1a 100644 --- a/tests/unit/db_conftest.py +++ b/tests/unit/db_conftest.py @@ -1,7 +1,7 @@ from datetime import date, datetime from decimal import Decimal from json import dumps as jdumps -from typing import Any, Callable, Dict, List +from typing import Any, Callable, Dict, List, Tuple from httpx import URL, Request, codes from pytest import fixture @@ -246,6 +246,18 @@ def query_url(engine_url: str, db_name: str) -> URL: ) +@fixture +def async_query_url(engine_url: str, db_name: str) -> URL: + return URL( + f"https://{engine_url}/", + params={ + "output_format": JSON_OUTPUT_FORMAT, + "database": db_name, + "async": "true", + }, + ) + + @fixture def query_url_updated(engine_url: str, db_name_updated: str) -> URL: return URL( @@ -523,3 +535,110 @@ def types_map() -> Dict[str, type]: **struct, **nested_struct, } + + +@fixture +def async_query_data() -> List[List[ColType]]: + query_data = [ + [ + "developer", + "ecosystem_ci", + "2025-01-23 14:08:06.087953+00", + "2025-01-23 14:08:06.134208+00", + "2025-01-23 14:08:06.410542+00", + "ENDED_SUCCESSFULLY", + "db4c7542-3058-4e2a-9d49-ae5ea2da3cbe", + "f9520387-224c-48e9-9858-b2d05518ce94", + "", + "2", + "2", + "0", + ] + ] + return query_data + + +@fixture +def async_query_meta() -> List[Tuple[str, str]]: + query_meta = [ + ("account_name", "text null"), + ("user_name", "text null"), + ("submitted_time", "timestamptz null"), + ("start_time", "timestamptz null"), + ("end_time", "timestamptz null"), + ("status", "text null"), + ("request_id", "text null"), + ("query_id", "text null"), + ("error_message", "text null"), + ("scanned_bytes", "long null"), + ("scanned_rows", "long null"), + ("retries", "long null"), + ] + return query_meta + + +@fixture +def async_query_callback_factory( + query_statistics: Dict[str, Any], +) -> Callable: + def create_callback( + data: List[List[ColType]], meta: List[Tuple[str, str]] + ) -> Callable: + def do_query(request: Request, **kwargs) -> Response: + assert request.read() != b"" + assert request.method == "POST" + assert f"output_format={JSON_OUTPUT_FORMAT}" in str(request.url) + query_response = { + "meta": [{"name": c[0], "type": c[1]} for c in meta], + "data": data, + "rows": len(data), + "statistics": query_statistics, + } + return Response(status_code=codes.OK, json=query_response) + + return do_query + + return create_callback + + +@fixture +def async_query_status_running_callback( + query_statistics: Dict[str, Any], + query_data: List[List[ColType]], + query_meta: List[Tuple[str, str]], +) -> Callable: + def do_query(request: Request, **kwargs) -> Response: + assert request.read() != b"" + assert request.method == "POST" + assert f"output_format={JSON_OUTPUT_FORMAT}" in str(request.url) + query_response = { + "meta": [{"name": c[0], "type": c[1]} for c in query_meta], + "data": query_data, + "rows": len(query_data), + "statistics": query_statistics, + } + return Response(status_code=codes.OK, json=query_response) + + return do_query + + +@fixture +def async_token() -> str: + return "async_token" + + +@fixture +def async_query_callback(async_token: str) -> Callable: + def do_query(request: Request, **kwargs) -> Response: + assert request.read() != b"" + assert request.method == "POST" + assert f"output_format={JSON_OUTPUT_FORMAT}" in str(request.url) + assert "async=true" in str(request.url) + query_response = { + "message": "the query was accepted for async processing", + "monitorSql": "CALL fb_GetAsyncStatus('token');", + "token": async_token, + } + return Response(status_code=codes.ACCEPTED, json=query_response) + + return do_query