diff --git a/docsrc/Connecting_and_queries.rst b/docsrc/Connecting_and_queries.rst index a25f94e460..8a07e9d1c9 100644 --- a/docsrc/Connecting_and_queries.rst +++ b/docsrc/Connecting_and_queries.rst @@ -610,87 +610,6 @@ Note, however, that it is not possible to retrieve the results of a server-side query, so these queries are best used for running DMLs and DDLs and ``SELECT``\ s should be used only for warming the cache. -Executing asynchronous DDL commands ------------------------------------- - -.. _ddl_execution_example: - -Executing queries server-side asynchronously is similar to executing server-side synchronous -queries, but the ``execute()`` command receives an extra parameter, ``async_execution=True``. -The example below uses ``cursor`` to create a new table called ``test_table``. -``execute(query, async_execution=True)`` will return a query ID, which can subsequently -be used to check the query status. - -:: - - query_id = cursor.execute( - """ - CREATE FACT TABLE IF NOT EXISTS test_table ( - id INT, - name TEXT - ) - PRIMARY INDEX id; - """, - async_execution=True - ) - - -To check the status of a query, send the query ID to ```get_status()``` to receive a -QueryStatus enumeration object. Possible statuses are: - - - * ``RUNNING`` - * ``ENDED_SUCCESSFULLY`` - * ``ENDED_UNSUCCESSFULLY`` - * ``NOT_READY`` - * ``STARTED_EXECUTION`` - * ``PARSE_ERROR`` - * ``CANCELED_EXECUTION`` - * ``EXECUTION_ERROR`` - - -Once the status of the table creation is ``ENDED_SUCCESSFULLY``, data can be inserted into it: - -:: - - from firebolt.async_db.cursor import QueryStatus - - query_status = cursor.get_status(query_id) - - if query_status == QueryStatus.ENDED_SUCCESSFULLY: - cursor.execute( - """ - INSERT INTO test_table VALUES - (1, 'hello'), - (2, 'world'), - (3, '!'); - """ - ) - - -In addition, server-side asynchronous queries can be cancelled calling ``cancel()``. - -:: - - query_id = cursor.execute( - """ - CREATE FACT TABLE IF NOT EXISTS test_table ( - id INT, - name TEXT - ) - PRIMARY INDEX id; - """, - async_execution=True - ) - - cursor.cancel(query_id) - - query_status = cursor.get_status(query_id) - - print(query_status) - -**Returns**: ``CANCELED_EXECUTION`` - Thread safety ============================== diff --git a/src/firebolt/async_db/cursor.py b/src/firebolt/async_db/cursor.py index c44710a46d..60bf7ef30d 100644 --- a/src/firebolt/async_db/cursor.py +++ b/src/firebolt/async_db/cursor.py @@ -1,7 +1,6 @@ from __future__ import annotations import logging -import re import time from abc import ABCMeta, abstractmethod from functools import wraps @@ -35,7 +34,6 @@ UPDATE_PARAMETERS_HEADER, BaseCursor, CursorState, - QueryStatus, Statistics, _parse_update_endpoint, _parse_update_parameters, @@ -45,7 +43,6 @@ ) from firebolt.common.constants import ENGINE_STATUS_RUNNING_LIST from firebolt.utils.exception import ( - AsyncExecutionUnavailableError, EngineNotRunningError, FireboltDatabaseError, FireboltEngineError, @@ -57,7 +54,7 @@ if TYPE_CHECKING: from firebolt.async_db.connection import Connection -from firebolt.utils.util import Timer, _print_error_body +from firebolt.utils.util import _print_error_body logger = logging.getLogger(__name__) @@ -126,12 +123,6 @@ async def _raise_if_error(self, resp: Response) -> None: async def _validate_set_parameter(self, parameter: SetParameter) -> None: """Validate parameter by executing simple query with it.""" _raise_if_internal_set_parameter(parameter) - if parameter.name == "async_execution": - raise AsyncExecutionUnavailableError( - "It is not possible to set async_execution using a SET command. " - "Instead, pass it as an argument to the execute() or " - "executemany() function." - ) resp = await self._api_request("select 1", {parameter.name: parameter.value}) # Handle invalid set parameter if resp.status_code == codes.BAD_REQUEST: @@ -170,7 +161,6 @@ async def _do_execute( raw_query: str, parameters: Sequence[Sequence[ParameterType]], skip_parsing: bool = False, - async_execution: Optional[bool] = False, ) -> None: self._reset() # Allow users to manually skip parsing for performance improvement. @@ -180,13 +170,7 @@ async def _do_execute( try: for query in queries: start_time = time.time() - # Our CREATE EXTERNAL TABLE queries currently require credentials, - # so we will skip logging those queries. - # https://docs.firebolt.io/sql-reference/commands/create-external-table.html - if isinstance(query, SetParameter) or not re.search( - "aws_key_id|credentials", query, flags=re.IGNORECASE - ): - logger.debug(f"Running query: {query}") + Cursor._log_query(query) # Define type for mypy row_set: Tuple[ @@ -197,35 +181,6 @@ async def _do_execute( ] = (-1, None, None, None) if isinstance(query, SetParameter): await self._validate_set_parameter(query) - elif async_execution: - self._validate_server_side_async_settings( - parameters, - queries, - skip_parsing, - async_execution, - ) - - with Timer( - f"[PERFORMANCE] Running query {query[:50]} " - f"{'... ' if len(query) > 50 else ''}" - ): - response = await self._api_request( - query, - { - "async_execution": 1, - "output_format": JSON_OUTPUT_FORMAT, - }, - ) - - await self._raise_if_error(response) - if response.headers.get("content-length", "") == "0": - raise OperationalError("No response to asynchronous query.") - resp = response.json() - if "query_id" not in resp or resp["query_id"] == "": - raise OperationalError( - "Invalid response to asynchronous query: missing query_id." - ) - self._query_id = resp["query_id"] else: resp = await self._api_request( query, {"output_format": JSON_OUTPUT_FORMAT} @@ -253,7 +208,6 @@ async def execute( query: str, parameters: Optional[Sequence[ParameterType]] = None, skip_parsing: bool = False, - async_execution: Optional[bool] = False, ) -> Union[int, str]: """Prepare and execute a database query. @@ -277,21 +231,19 @@ async def execute( skip_parsing (bool): Flag to disable query parsing. This will disable parameterized, multi-statement and SET queries, while improving performance - async_execution (bool): flag to determine if query should be asynchronous Returns: int: Query row count. """ params_list = [parameters] if parameters else [] - await self._do_execute(query, params_list, skip_parsing, async_execution) - return self.query_id if async_execution else self.rowcount + await self._do_execute(query, params_list, skip_parsing) + return self.rowcount @check_not_closed async def executemany( self, query: str, parameters_seq: Sequence[Sequence[ParameterType]], - async_execution: Optional[bool] = False, ) -> Union[int, str]: """Prepare and execute a database query. @@ -316,46 +268,12 @@ async def executemany( substitution parameter sets. Used to replace '?' placeholders inside a query with actual values from each set in a sequence. Resulting queries for each subset are executed sequentially. - async_execution (bool): flag to determine if query should be asynchronous Returns: - int|str: Query row count for synchronous execution of queries, - query ID string for asynchronous execution. + int: Query row count. """ - await self._do_execute(query, parameters_seq, async_execution=async_execution) - if async_execution: - return self.query_id - else: - return self.rowcount - - @check_not_closed - async def get_status(self, query_id: str) -> QueryStatus: - """Get status of a server-side async query. Return the state of the query.""" - try: - resp = await self._api_request( - # output_format must be empty for status to work correctly. - # And set parameters will cause 400 errors. - parameters={"query_id": query_id}, - path="status", - use_set_parameters=False, - ) - if resp.status_code == codes.BAD_REQUEST: - raise OperationalError( - f"Asynchronous query {query_id} status check failed: " - f"{resp.status_code}." - ) - resp_json = resp.json() - if "status" not in resp_json: - raise OperationalError( - "Invalid response to asynchronous query: missing status." - ) - except Exception: - self._state = CursorState.ERROR - raise - # Remember that query_id might be empty. - if resp_json["status"] == "": - return QueryStatus.NOT_READY - return QueryStatus[resp_json["status"]] + await self._do_execute(query, parameters_seq) + return self.rowcount @abstractmethod async def is_db_available(self, database: str) -> bool: @@ -389,15 +307,6 @@ async def fetchall(self) -> List[List[ColType]]: async def nextset(self) -> None: return super().nextset() - @check_not_closed - async def cancel(self, query_id: str) -> None: - """Cancel a server-side async query.""" - await self._api_request( - parameters={"query_id": query_id}, - path="cancel", - use_set_parameters=False, - ) - # Iteration support @check_not_closed @check_query_executed @@ -557,7 +466,7 @@ def __init__( async def _api_request( self, query: Optional[str] = "", - parameters: Optional[dict[str, Any]] = {}, + parameters: Optional[dict[str, Any]] = None, path: Optional[str] = "", use_set_parameters: Optional[bool] = True, ) -> Response: diff --git a/src/firebolt/common/base_cursor.py b/src/firebolt/common/base_cursor.py index b56e1e6854..5ddf1d3cba 100644 --- a/src/firebolt/common/base_cursor.py +++ b/src/firebolt/common/base_cursor.py @@ -1,25 +1,24 @@ from __future__ import annotations import logging +import re from dataclasses import dataclass, fields from enum import Enum from functools import wraps from types import TracebackType -from typing import Any, Callable, Dict, List, Optional, Sequence, Tuple, Union +from typing import Any, Callable, Dict, List, Optional, Tuple, Union from httpx import URL, Response from firebolt.common._types import ( ColType, Column, - ParameterType, RawColType, SetParameter, parse_type, parse_value, ) from firebolt.utils.exception import ( - AsyncExecutionUnavailableError, ConfigurationError, CursorClosedError, DataError, @@ -40,19 +39,6 @@ class CursorState(Enum): CLOSED = 4 -class QueryStatus(Enum): - """Enumeration of query responses on server-side async queries.""" - - RUNNING = 1 - ENDED_SUCCESSFULLY = 2 - ENDED_UNSUCCESSFULLY = 3 - NOT_READY = 4 - STARTED_EXECUTION = 5 - PARSE_ERROR = 6 - CANCELED_EXECUTION = 7 - EXECUTION_ERROR = 8 - - # Parameters that should be set using USE instead of SET USE_PARAMETER_LIST = ["database", "engine"] # parameters that can only be set by the backend @@ -325,6 +311,16 @@ def _update_server_parameters(self, parameters: Dict[str, Any]) -> None: for key, value in parameters.items(): self.parameters[key] = value + @staticmethod + def _log_query(query: Union[str, SetParameter]) -> None: + # Our CREATE EXTERNAL TABLE queries currently require credentials, + # so we will skip logging those queries. + # https://docs.firebolt.io/sql-reference/commands/create-external-table.html + if isinstance(query, SetParameter) or not re.search( + "aws_key_id|credentials", query, flags=re.IGNORECASE + ): + logger.debug(f"Running query: {query}") + @property def engine_name(self) -> str: """ @@ -382,33 +378,6 @@ def _append_row_set( # Populate values for first set self._pop_next_set() - def _validate_server_side_async_settings( - self, - parameters: Sequence[Sequence[ParameterType]], - queries: List[Union[SetParameter, str]], - skip_parsing: bool = False, - async_execution: Optional[bool] = False, - ) -> None: - if async_execution and self._set_parameters.get("use_standard_sql", "1") == "0": - raise AsyncExecutionUnavailableError( - "It is not possible to execute queries asynchronously if " - "use_standard_sql=0." - ) - if parameters and skip_parsing: - logger.warning( - "Query formatting parameters are provided but skip_parsing " - "is specified. They will be ignored." - ) - non_set_queries = 0 - for query in queries: - if type(query) is not SetParameter: - non_set_queries += 1 - if non_set_queries > 1 and async_execution: - raise AsyncExecutionUnavailableError( - "It is not possible to execute multi-statement " - "queries asynchronously." - ) - def _parse_row(self, row: List[RawColType]) -> List[ColType]: """Parse a single data row based on query column types.""" assert len(row) == len(self.description) diff --git a/src/firebolt/db/cursor.py b/src/firebolt/db/cursor.py index 94978f7414..0b6fcd7216 100644 --- a/src/firebolt/db/cursor.py +++ b/src/firebolt/db/cursor.py @@ -1,7 +1,6 @@ from __future__ import annotations import logging -import re import time from abc import ABCMeta, abstractmethod from typing import ( @@ -33,7 +32,6 @@ UPDATE_PARAMETERS_HEADER, BaseCursor, CursorState, - QueryStatus, Statistics, _parse_update_endpoint, _parse_update_parameters, @@ -43,7 +41,6 @@ ) from firebolt.common.constants import ENGINE_STATUS_RUNNING_LIST from firebolt.utils.exception import ( - AsyncExecutionUnavailableError, EngineNotRunningError, FireboltDatabaseError, FireboltEngineError, @@ -125,12 +122,6 @@ def _api_request( def _validate_set_parameter(self, parameter: SetParameter) -> None: """Validate parameter by executing simple query with it.""" _raise_if_internal_set_parameter(parameter) - if parameter.name == "async_execution": - raise AsyncExecutionUnavailableError( - "It is not possible to set async_execution using a SET command. " - "Instead, pass it as an argument to the execute() or " - "executemany() function." - ) resp = self._api_request("select 1", {parameter.name: parameter.value}) # Handle invalid set parameter if resp.status_code == codes.BAD_REQUEST: @@ -169,7 +160,6 @@ def _do_execute( raw_query: str, parameters: Sequence[Sequence[ParameterType]], skip_parsing: bool = False, - async_execution: Optional[bool] = False, ) -> None: self._reset() # Allow users to manually skip parsing for performance improvement. @@ -179,13 +169,8 @@ def _do_execute( try: for query in queries: start_time = time.time() - # Our CREATE EXTERNAL TABLE queries currently require credentials, - # so we will skip logging those queries. - # https://docs.firebolt.io/sql-reference/commands/create-external-table.html - if isinstance(query, SetParameter) or not re.search( - "aws_key_id|credentials", query, flags=re.IGNORECASE - ): - logger.debug(f"Running query: {query}") + + Cursor._log_query(query) # Define type for mypy row_set: Tuple[ @@ -196,29 +181,6 @@ def _do_execute( ] = (-1, None, None, None) if isinstance(query, SetParameter): self._validate_set_parameter(query) - elif async_execution: - self._validate_server_side_async_settings( - parameters, - queries, - skip_parsing, - async_execution, - ) - response = self._api_request( - query, - { - "async_execution": 1, - "output_format": JSON_OUTPUT_FORMAT, - }, - ) - self._raise_if_error(response) - if response.headers.get("content-length", "") == "0": - raise OperationalError("No response to asynchronous query.") - resp = response.json() - if "query_id" not in resp or resp["query_id"] == "": - raise OperationalError( - "Invalid response to asynchronous query: missing query_id." - ) - self._query_id = resp["query_id"] else: resp = self._api_request( query, {"output_format": JSON_OUTPUT_FORMAT} @@ -246,7 +208,6 @@ def execute( query: str, parameters: Optional[Sequence[ParameterType]] = None, skip_parsing: bool = False, - async_execution: Optional[bool] = False, ) -> Union[int, str]: """Prepare and execute a database query. @@ -270,21 +231,17 @@ def execute( skip_parsing (bool): Flag to disable query parsing. This will disable parameterized, multi-statement and SET queries, while improving performance - async_execution (bool): flag to determine if query should be asynchronous Returns: int: Query row count. """ params_list = [parameters] if parameters else [] - self._do_execute(query, params_list, skip_parsing, async_execution) - return self.query_id if async_execution else self.rowcount + self._do_execute(query, params_list, skip_parsing) + return self.rowcount @check_not_closed def executemany( - self, - query: str, - parameters_seq: Sequence[Sequence[ParameterType]], - async_execution: Optional[bool] = False, + self, query: str, parameters_seq: Sequence[Sequence[ParameterType]] ) -> Union[int, str]: """Prepare and execute a database query. @@ -309,46 +266,12 @@ def executemany( substitution parameter sets. Used to replace '?' placeholders inside a query with actual values from each set in a sequence. Resulting queries for each subset are executed sequentially. - async_execution (bool): flag to determine if query should be asynchronous Returns: - int|str: Query row count for synchronous execution of queries, - query ID string for asynchronous execution. + int: Query row count. """ - self._do_execute(query, parameters_seq, async_execution=async_execution) - if async_execution: - return self.query_id - else: - return self.rowcount - - @check_not_closed - def get_status(self, query_id: str) -> QueryStatus: - """Get status of a server-side async query. Return the state of the query.""" - try: - resp = self._api_request( - # output_format must be empty for status to work correctly. - # And set parameters will cause 400 errors. - parameters={"query_id": query_id}, - path="status", - use_set_parameters=False, - ) - if resp.status_code == codes.BAD_REQUEST: - raise OperationalError( - f"Asynchronous query {query_id} status check failed: " - f"{resp.status_code}." - ) - resp_json = resp.json() - if "status" not in resp_json: - raise OperationalError( - "Invalid response to asynchronous query: missing status." - ) - except Exception: - self._state = CursorState.ERROR - raise - # Remember that query_id might be empty. - if resp_json["status"] == "": - return QueryStatus.NOT_READY - return QueryStatus[resp_json["status"]] + self._do_execute(query, parameters_seq) + return self.rowcount @abstractmethod def is_db_available(self, database: str) -> bool: @@ -360,15 +283,6 @@ def is_engine_running(self, engine_url: str) -> bool: """Verify that the engine is running.""" ... - @check_not_closed - def cancel(self, query_id: str) -> None: - """Cancel a server-side async query.""" - self._api_request( - parameters={"query_id": query_id}, - path="cancel", - use_set_parameters=False, - ) - # Iteration support @check_not_closed @check_query_executed @@ -498,7 +412,7 @@ def __init__( def _api_request( self, query: Optional[str] = "", - parameters: Optional[dict[str, Any]] = {}, + parameters: Optional[dict[str, Any]] = None, path: Optional[str] = "", use_set_parameters: Optional[bool] = True, ) -> Response: diff --git a/src/firebolt/utils/exception.py b/src/firebolt/utils/exception.py index bb9877941a..b3e62a06a6 100644 --- a/src/firebolt/utils/exception.py +++ b/src/firebolt/utils/exception.py @@ -267,14 +267,3 @@ class NotSupportedError(DatabaseError): class ConfigurationError(InterfaceError): """Invalid configuration error.""" - - -class AsyncExecutionUnavailableError(ProgrammingError): - """ - If `use_standard_sql` is specified the query status endpoint returns a JSON - object with empty values instead of a proper status object. In that case, - it is not possible to retrieve the results of an asynchronous query. - """ - - def __init__(self, error_message): # type: ignore - super().__init__(error_message) diff --git a/tests/integration/dbapi/async/V1/test_queries_async.py b/tests/integration/dbapi/async/V1/test_queries_async.py index 8dfcd1b730..d459f08d63 100644 --- a/tests/integration/dbapi/async/V1/test_queries_async.py +++ b/tests/integration/dbapi/async/V1/test_queries_async.py @@ -7,7 +7,6 @@ from pytest import fixture, mark, raises from firebolt.async_db import Binary, Connection, Cursor, OperationalError -from firebolt.async_db.cursor import QueryStatus from firebolt.common._types import ColType, Column VALS_TO_INSERT_2 = ",".join( @@ -71,29 +70,6 @@ def assert_deep_eq(got: Any, expected: Any, msg: str) -> bool: ), f"{msg}: {got}(got) != {expected}(expected)" -async def status_loop( - query_id: str, - query: str, - cursor: Cursor, - start_status: QueryStatus = QueryStatus.NOT_READY, - final_status: QueryStatus = QueryStatus.ENDED_SUCCESSFULLY, -) -> None: - """ - Continually check status of asynchronously executed query. Compares - QueryStatus object returned from get_status() to desired final_status. - Used in test_server_side_async_execution_cancel() and - test_server_side_async_execution_get_status(). - """ - status = await cursor.get_status(query_id) - # get_status() will return NOT_READY until it succeeds or fails. - while status == start_status or status == QueryStatus.NOT_READY: - # This only checks to see if a correct response is returned - status = await cursor.get_status(query_id) - assert ( - status == final_status - ), f"Failed {query}. Got {status} rather than {final_status}." - - async def test_connect_engine_name( connection_engine_name: Connection, all_types_query: str, @@ -423,60 +399,6 @@ async def test_set_invalid_parameter(connection: Connection): assert len(c._set_parameters) == 0 -async def test_server_side_async_execution_query(connection: Connection) -> None: - """Make an sql query and receive an id back.""" - with connection.cursor() as c: - query_id = await c.execute("SELECT 1", [], async_execution=True) - assert ( - query_id and type(query_id) is str - ), "Invalid query id was returned from server-side async query." - - -@mark.skip( - reason="Can't get consistently slow queries so fails significant portion of time." -) -async def test_server_side_async_execution_cancel( - create_server_side_test_table_setup_teardown_async, -) -> None: - """Test cancel.""" - c = create_server_side_test_table_setup_teardown_async - await c.execute(LONG_INSERT, async_execution=True) - # Cancel, then check that status is cancelled. - await c.cancel(query_id) - await status_loop( - query_id, - "cancel", - c, - start_status=QueryStatus.STARTED_EXECUTION, - final_status=QueryStatus.CANCELED_EXECUTION, - ) - - -@mark.skip( - reason=( - "Can't get consistently slow queries so fails significant portion of time. " - "get_status() always returns a QueryStatus object, so this assertion will " - "always pass. Error condition of invalid status is caught in get_status()." - ) -) -async def test_server_side_async_execution_get_status( - create_server_side_test_table_setup_teardown_async, -) -> None: - """ - Test get_status(). Test for three ending conditions: Simply test to see - that a StatusQuery object is returned. Queries are succeeding too quickly - to be able to check for specific status states. - """ - c = create_server_side_test_table_setup_teardown_async - query_id = await c.execute(LONG_INSERT, async_execution=True) - await c.get_status(query_id) - # Commented out assert because I was getting warnig errors about it being - # always true even when this should be skipping. - # assert ( - # type(status) is QueryStatus, - # ), "get_status() did not return a QueryStatus object." - - async def test_bytea_roundtrip( connection: Connection, ) -> None: diff --git a/tests/integration/dbapi/async/V2/test_queries_async.py b/tests/integration/dbapi/async/V2/test_queries_async.py index 3f683ac00f..8e6bcc7d2c 100644 --- a/tests/integration/dbapi/async/V2/test_queries_async.py +++ b/tests/integration/dbapi/async/V2/test_queries_async.py @@ -9,7 +9,6 @@ from firebolt.async_db import Binary, Connection, Cursor, OperationalError from firebolt.async_db.connection import connect -from firebolt.async_db.cursor import QueryStatus from firebolt.client.auth.base import Auth from firebolt.common._types import ColType, Column from tests.integration.conftest import API_ENDPOINT_ENV @@ -21,29 +20,6 @@ LONG_INSERT = f"INSERT INTO test_tbl VALUES {VALS_TO_INSERT_2}" -async def status_loop( - query_id: str, - query: str, - cursor: Cursor, - start_status: QueryStatus = QueryStatus.NOT_READY, - final_status: QueryStatus = QueryStatus.ENDED_SUCCESSFULLY, -) -> None: - """ - Continually check status of asynchronously executed query. Compares - QueryStatus object returned from get_status() to desired final_status. - Used in test_server_side_async_execution_cancel() and - test_server_side_async_execution_get_status(). - """ - status = await cursor.get_status(query_id) - # get_status() will return NOT_READY until it succeeds or fails. - while status == start_status or status == QueryStatus.NOT_READY: - # This only checks to see if a correct response is returned - status = await cursor.get_status(query_id) - assert ( - status == final_status - ), f"Failed {query}. Got {status} rather than {final_status}." - - async def test_connect_no_db( connection_no_db: Connection, all_types_query: str, @@ -352,60 +328,6 @@ async def test_set_invalid_parameter(connection: Connection): assert len(c._set_parameters) == 0 -async def test_server_side_async_execution_query(connection: Connection) -> None: - """Make an sql query and receive an id back.""" - with connection.cursor() as c: - query_id = await c.execute("SELECT 1", [], async_execution=True) - assert ( - query_id and type(query_id) is str - ), "Invalid query id was returned from server-side async query." - - -@mark.skip( - reason="Can't get consistently slow queries so fails significant portion of time." -) -async def test_server_side_async_execution_cancel( - create_server_side_test_table_setup_teardown_async, -) -> None: - """Test cancel.""" - c = create_server_side_test_table_setup_teardown_async - await c.execute(LONG_INSERT, async_execution=True) - # Cancel, then check that status is cancelled. - await c.cancel(query_id) - await status_loop( - query_id, - "cancel", - c, - start_status=QueryStatus.STARTED_EXECUTION, - final_status=QueryStatus.CANCELED_EXECUTION, - ) - - -@mark.skip( - reason=( - "Can't get consistently slow queries so fails significant portion of time. " - "get_status() always returns a QueryStatus object, so this assertion will " - "always pass. Error condition of invalid status is caught in get_status()." - ) -) -async def test_server_side_async_execution_get_status( - create_server_side_test_table_setup_teardown_async, -) -> None: - """ - Test get_status(). Test for three ending conditions: Simply test to see - that a StatusQuery object is returned. Queries are succeeding too quickly - to be able to check for specific status states. - """ - c = create_server_side_test_table_setup_teardown_async - query_id = await c.execute(LONG_INSERT, async_execution=True) - await c.get_status(query_id) - # Commented out assert because I was getting warnig errors about it being - # always true even when this should be skipping. - # assert ( - # type(status) is QueryStatus, - # ), "get_status() did not return a QueryStatus object." - - async def test_bytea_roundtrip( connection: Connection, ) -> None: diff --git a/tests/integration/dbapi/sync/V1/test_queries.py b/tests/integration/dbapi/sync/V1/test_queries.py index 1ac61eb262..3daf23ae3c 100644 --- a/tests/integration/dbapi/sync/V1/test_queries.py +++ b/tests/integration/dbapi/sync/V1/test_queries.py @@ -6,7 +6,6 @@ from pytest import fixture, mark, raises -from firebolt.async_db.cursor import QueryStatus from firebolt.client.auth import Auth from firebolt.common._types import ColType, Column from firebolt.db import Binary, Connection, Cursor, OperationalError, connect @@ -23,29 +22,6 @@ def assert_deep_eq(got: Any, expected: Any, msg: str) -> bool: ), f"{msg}: {got}(got) != {expected}(expected)" -def status_loop( - query_id: str, - query: str, - cursor: Cursor, - start_status: QueryStatus = QueryStatus.NOT_READY, - final_status: QueryStatus = QueryStatus.ENDED_SUCCESSFULLY, -) -> None: - """ - Continually check status of asynchronously executed query. Compares - QueryStatus object returned from get_status() to desired final_status. - Used in test_server_side_async_execution_cancel() and - test_server_side_async_execution_get_status(). - """ - status = cursor.get_status(query_id) - # get_status() will return NOT_READY until it succeeds or fails. - while status == start_status or status == QueryStatus.NOT_READY: - # This only checks to see if a correct response is returned - status = cursor.get_status(query_id) - assert ( - status == final_status - ), f"Failed {query}. Got {status} rather than {final_status}." - - def test_connect_engine_name( connection_engine_name: Connection, all_types_query: str, @@ -409,55 +385,6 @@ def run_queries_parallel() -> None: assert len(exceptions) == 0, exceptions -def test_server_side_async_execution_query(connection: Connection) -> None: - """Make an sql query and receive an id back.""" - with connection.cursor() as c: - query_id = c.execute("SELECT 1", [], async_execution=True) - assert ( - query_id and type(query_id) is str - ), "Invalid query id was returned from server-side async query." - - -@mark.skip( - reason="Can't get consistently slow queries so fails significant portion of time." -) -async def test_server_side_async_execution_cancel( - create_server_side_test_table_setup_teardown, -) -> None: - """Test cancel().""" - c = create_server_side_test_table_setup_teardown - # Cancel, then check that status is cancelled. - c.cancel(query_id) - status_loop( - query_id, - "cancel", - c, - start_status=QueryStatus.STARTED_EXECUTION, - final_status=QueryStatus.CANCELED_EXECUTION, - ) - - -@mark.skip( - reason=( - "Can't get consistently slow queries so fails significant portion of time. " - "get_status() always returns a QueryStatus object, so this assertion will " - "always pass. Error condition of invalid status is caught in get_status()." - ) -) -async def test_server_side_async_execution_get_status( - create_server_side_test_table_setup_teardown, -) -> None: - """Test get_status().""" - c = create_server_side_test_table_setup_teardown - query_id = c.execute(LONG_INSERT, async_execution=True) - status = c.get_status(query_id) - # Commented out assert because I was getting warnig errors about it being - # always true even when this should be skipping. - # assert ( - # type(status) is QueryStatus, - # ), "get_status() did not return a QueryStatus object." - - @mark.xdist_group("multi_thread_connection_sharing") def test_multi_thread_connection_sharing( engine_url: str, diff --git a/tests/integration/dbapi/sync/V2/test_queries.py b/tests/integration/dbapi/sync/V2/test_queries.py index 1dc18ed8f6..6e4362c982 100644 --- a/tests/integration/dbapi/sync/V2/test_queries.py +++ b/tests/integration/dbapi/sync/V2/test_queries.py @@ -8,7 +8,6 @@ from pytest import mark, raises -from firebolt.async_db.cursor import QueryStatus from firebolt.client.auth import Auth from firebolt.common._types import ColType, Column from firebolt.db import Binary, Connection, Cursor, OperationalError, connect @@ -27,29 +26,6 @@ def assert_deep_eq(got: Any, expected: Any, msg: str) -> bool: ), f"{msg}: {got}(got) != {expected}(expected)" -def status_loop( - query_id: str, - query: str, - cursor: Cursor, - start_status: QueryStatus = QueryStatus.NOT_READY, - final_status: QueryStatus = QueryStatus.ENDED_SUCCESSFULLY, -) -> None: - """ - Continually check status of asynchronously executed query. Compares - QueryStatus object returned from get_status() to desired final_status. - Used in test_server_side_async_execution_cancel() and - test_server_side_async_execution_get_status(). - """ - status = cursor.get_status(query_id) - # get_status() will return NOT_READY until it succeeds or fails. - while status == start_status or status == QueryStatus.NOT_READY: - # This only checks to see if a correct response is returned - status = cursor.get_status(query_id) - assert ( - status == final_status - ), f"Failed {query}. Got {status} rather than {final_status}." - - def test_connect_no_db( connection_no_db: Connection, all_types_query: str, @@ -395,55 +371,6 @@ def run_queries_parallel() -> None: assert len(exceptions) == 0, exceptions -def test_server_side_async_execution_query(connection: Connection) -> None: - """Make an sql query and receive an id back.""" - with connection.cursor() as c: - query_id = c.execute("SELECT 1", [], async_execution=True) - assert ( - query_id and type(query_id) is str - ), "Invalid query id was returned from server-side async query." - - -@mark.skip( - reason="Can't get consistently slow queries so fails significant portion of time." -) -async def test_server_side_async_execution_cancel( - create_server_side_test_table_setup_teardown, -) -> None: - """Test cancel().""" - c = create_server_side_test_table_setup_teardown - # Cancel, then check that status is cancelled. - c.cancel(query_id) - status_loop( - query_id, - "cancel", - c, - start_status=QueryStatus.STARTED_EXECUTION, - final_status=QueryStatus.CANCELED_EXECUTION, - ) - - -@mark.skip( - reason=( - "Can't get consistently slow queries so fails significant portion of time. " - "get_status() always returns a QueryStatus object, so this assertion will " - "always pass. Error condition of invalid status is caught in get_status()." - ) -) -async def test_server_side_async_execution_get_status( - create_server_side_test_table_setup_teardown, -) -> None: - """Test get_status().""" - c = create_server_side_test_table_setup_teardown - query_id = c.execute(LONG_INSERT, async_execution=True) - status = c.get_status(query_id) - # Commented out assert because I was getting warnig errors about it being - # always true even when this should be skipping. - # assert ( - # type(status) is QueryStatus, - # ), "get_status() did not return a QueryStatus object." - - @mark.xdist_group("multi_thread_connection_sharing") def test_multi_thread_connection_sharing( engine_name: str, diff --git a/tests/unit/async_db/V1/test_cursor.py b/tests/unit/async_db/V1/test_cursor.py index 5741652a05..1836569cee 100644 --- a/tests/unit/async_db/V1/test_cursor.py +++ b/tests/unit/async_db/V1/test_cursor.py @@ -7,9 +7,8 @@ from firebolt.async_db import Cursor from firebolt.common._types import Column -from firebolt.common.base_cursor import ColType, CursorState, QueryStatus +from firebolt.common.base_cursor import ColType, CursorState from firebolt.utils.exception import ( - AsyncExecutionUnavailableError, ConfigurationError, CursorClosedError, DataError, @@ -101,7 +100,6 @@ async def test_cursor_no_query( auth_url: str, query_callback: Callable, query_url: str, - server_side_async_id: str, cursor: Cursor, ): """Some cursor methods are unavailable until a query is run.""" @@ -309,90 +307,6 @@ def http_error(*args, **kwargs): httpx_mock.reset(True) -async def test_cursor_server_side_async_execute_errors( - httpx_mock: HTTPXMock, - auth_callback: Callable, - auth_url: str, - query_with_params_url: str, - server_side_async_missing_id_callback: Callable, - insert_query_callback: str, - cursor: Cursor, -): - """ - Cursor handles all types of errors properly using server-side - async queries. - """ - for query, message in ( - ( - lambda sql: cursor.execute(sql, async_execution=True), - "server-side asynchronous execute()", - ), - ( - lambda sql: cursor.executemany(sql, [], async_execution=True), - "server-side asynchronous executemany()", - ), - ): - # Empty server-side asynchronous execution return. - httpx_mock.add_callback(auth_callback, url=auth_url) - httpx_mock.add_callback(insert_query_callback, url=query_with_params_url) - with raises(OperationalError) as excinfo: - await query("SELECT * FROM t") - - assert cursor._state == CursorState.ERROR - assert str(excinfo.value) == ("No response to asynchronous query.") - - # Missing query_id from server-side asynchronous execution. - httpx_mock.add_callback(auth_callback, url=auth_url) - httpx_mock.add_callback( - server_side_async_missing_id_callback, url=query_with_params_url - ) - with raises(OperationalError) as excinfo: - await query("SELECT * FROM t") - - assert cursor._state == CursorState.ERROR - assert str(excinfo.value) == ( - "Invalid response to asynchronous query: missing query_id." - ) - - # Multi-statement queries are not possible with async_execution error. - httpx_mock.add_callback(auth_callback, url=auth_url) - with raises(AsyncExecutionUnavailableError) as excinfo: - await query("SELECT * FROM t; SELECT * FROM s") - - assert cursor._state == CursorState.ERROR - assert str(excinfo.value) == ( - "It is not possible to execute multi-statement queries asynchronously." - ), f"Multi-statement query was allowed for {message}." - - # Error out if async_execution is set via SET statement. - with raises(AsyncExecutionUnavailableError) as excinfo: - await cursor.execute("SET async_execution=1") - - assert cursor._state == CursorState.ERROR - assert str(excinfo.value) == ( - "It is not possible to set async_execution using a SET command. " - "Instead, pass it as an argument to the execute() or " - "executemany() function." - ), f"async_execution was allowed via a SET parameter on {message}." - - # Error out when doing async_execution and use_standard_sql are off. - with raises(AsyncExecutionUnavailableError) as excinfo: - await cursor.execute( - "SET use_standard_sql=0; SELECT * FROM t", async_execution=True - ) - - assert cursor._state == CursorState.ERROR - assert str(excinfo.value) == ( - "It is not possible to execute queries asynchronously if " - "use_standard_sql=0." - ), f"use_standard_sql=0 was allowed for server-side asynchronous queries on {message}." - - # Have to reauth or next execute fails. Not sure why. - httpx_mock.add_callback(auth_callback, url=auth_url) - await cursor.execute("set use_standard_sql=1") - httpx_mock.reset(True) - - async def test_cursor_fetchone( httpx_mock: HTTPXMock, auth_callback: Callable, @@ -700,141 +614,6 @@ async def test_cursor_skip_parse( split_format_sql_mock.assert_not_called() -async def test_cursor_server_side_async_execute( - httpx_mock: HTTPXMock, - auth_callback: Callable, - auth_url: str, - server_side_async_id_callback: Callable, - server_side_async_id: Callable, - query_with_params_url: str, - cursor: Cursor, -): - """ - Cursor is able to execute query server-side asynchronously and - query_id is returned. - """ - for query, message in ( - ( - lambda: cursor.execute("select * from t", async_execution=True), - "server-side asynchronous execute()", - ), - ( - lambda: cursor.executemany( - "select * from t", parameters_seq=[], async_execution=True - ), - "server-side asynchronous executemany()", - ), - ): - # Query with json output - httpx_mock.add_callback(auth_callback, url=auth_url) - httpx_mock.add_callback( - server_side_async_id_callback, url=query_with_params_url - ) - - assert ( - await query() == server_side_async_id - ), f"Invalid query id returned for {message}." - assert ( - cursor.rowcount == -1 - ), f"Invalid rowcount value for insert using {message}." - assert ( - cursor.description is None - ), f"Invalid description for insert using {message}." - - -async def test_cursor_server_side_async_cancel( - httpx_mock: HTTPXMock, - auth_callback: Callable, - auth_url: str, - server_side_async_cancel_callback: Callable, - server_side_async_id: Callable, - query_with_params_url: str, - cursor: Cursor, -): - """ - Cursor is able to cancel query server-side asynchronously and - query_id is returned. - """ - - # Query with json output - httpx_mock.add_callback(auth_callback, url=auth_url) - httpx_mock.add_callback( - server_side_async_cancel_callback, url=query_with_params_url - ) - await cursor.cancel(server_side_async_id) - - -async def test_cursor_server_side_async_get_status_completed( - httpx_mock: HTTPXMock, - auth_callback: Callable, - auth_url: str, - server_side_async_get_status_callback: Callable, - server_side_async_id: Callable, - query_with_params_url: str, - cursor: Cursor, -): - """ - Cursor is able to execute query server-side asynchronously and - query_id is returned. - """ - - # Query with json output - httpx_mock.add_callback(auth_callback, url=auth_url) - httpx_mock.add_callback( - server_side_async_get_status_callback, url=query_with_params_url - ) - status = await cursor.get_status(server_side_async_id) - assert status == QueryStatus.ENDED_SUCCESSFULLY - - -async def test_cursor_server_side_async_get_status_not_yet_available( - httpx_mock: HTTPXMock, - auth_callback: Callable, - auth_url: str, - server_side_async_get_status_not_yet_availabe_callback: Callable, - server_side_async_id: Callable, - query_with_params_url: str, - cursor: Cursor, -): - """ - Cursor is able to execute query server-side asynchronously and - query_id is returned. - """ - - # Query with json output - httpx_mock.add_callback(auth_callback, url=auth_url) - httpx_mock.add_callback( - server_side_async_get_status_not_yet_availabe_callback, - url=query_with_params_url, - ) - status = await cursor.get_status(server_side_async_id) - assert status == QueryStatus.NOT_READY - - -async def test_cursor_server_side_async_get_status_error( - httpx_mock: HTTPXMock, - auth_callback: Callable, - auth_url: str, - server_side_async_get_status_error: Callable, - server_side_async_id: Callable, - query_with_params_url: str, - cursor: Cursor, -): - """ """ - httpx_mock.add_callback(auth_callback, url=auth_url) - httpx_mock.add_callback( - server_side_async_get_status_error, url=query_with_params_url - ) - with raises(OperationalError) as excinfo: - await cursor.get_status(server_side_async_id) - - assert cursor._state == CursorState.ERROR - assert ( - str(excinfo.value) - == f"Asynchronous query {server_side_async_id} status check failed." - ), f"Invalid get_status error message." - - async def test_server_side_header_database( httpx_mock: HTTPXMock, auth_callback: Callable, diff --git a/tests/unit/async_db/V2/test_cursor.py b/tests/unit/async_db/V2/test_cursor.py index f3542e3ed5..cd2528c678 100644 --- a/tests/unit/async_db/V2/test_cursor.py +++ b/tests/unit/async_db/V2/test_cursor.py @@ -7,9 +7,8 @@ from firebolt.async_db import Cursor from firebolt.common._types import Column -from firebolt.common.base_cursor import ColType, CursorState, QueryStatus +from firebolt.common.base_cursor import ColType, CursorState from firebolt.utils.exception import ( - AsyncExecutionUnavailableError, ConfigurationError, CursorClosedError, DataError, @@ -356,83 +355,6 @@ def http_error(*args, **kwargs): httpx_mock.reset(True) -async def test_cursor_server_side_async_execute_errors( - httpx_mock: HTTPXMock, - query_with_params_url: str, - server_side_async_missing_id_callback: Callable, - insert_query_callback: str, - cursor: Cursor, -): - """ - Cursor handles all types of errors properly using server-side - async queries. - """ - for query, message in ( - ( - lambda sql: cursor.execute(sql, async_execution=True), - "server-side asynchronous execute()", - ), - ( - lambda sql: cursor.executemany(sql, [], async_execution=True), - "server-side asynchronous executemany()", - ), - ): - httpx_mock.add_callback(insert_query_callback, url=query_with_params_url) - with raises(OperationalError) as excinfo: - await query("SELECT * FROM t") - - assert cursor._state == CursorState.ERROR - assert str(excinfo.value) == ("No response to asynchronous query.") - - # Missing query_id from server-side asynchronous execution. - httpx_mock.add_callback( - server_side_async_missing_id_callback, url=query_with_params_url - ) - with raises(OperationalError) as excinfo: - await query("SELECT * FROM t") - - assert cursor._state == CursorState.ERROR - assert str(excinfo.value) == ( - "Invalid response to asynchronous query: missing query_id." - ) - - # Multi-statement queries are not possible with async_execution error. - with raises(AsyncExecutionUnavailableError) as excinfo: - await query("SELECT * FROM t; SELECT * FROM s") - - assert cursor._state == CursorState.ERROR - assert str(excinfo.value) == ( - "It is not possible to execute multi-statement queries asynchronously." - ), f"Multi-statement query was allowed for {message}." - - # Error out if async_execution is set via SET statement. - with raises(AsyncExecutionUnavailableError) as excinfo: - await cursor.execute("SET async_execution=1") - - assert cursor._state == CursorState.ERROR - assert str(excinfo.value) == ( - "It is not possible to set async_execution using a SET command. " - "Instead, pass it as an argument to the execute() or " - "executemany() function." - ), f"async_execution was allowed via a SET parameter on {message}." - - # Error out when doing async_execution and use_standard_sql are off. - with raises(AsyncExecutionUnavailableError) as excinfo: - await cursor.execute( - "SET use_standard_sql=0; SELECT * FROM t", async_execution=True - ) - - assert cursor._state == CursorState.ERROR - assert str(excinfo.value) == ( - "It is not possible to execute queries asynchronously if " - "use_standard_sql=0." - ), f"use_standard_sql=0 was allowed for server-side asynchronous queries on {message}." - - # Have to reauth or next execute fails. Not sure why. - await cursor.execute("set use_standard_sql=1") - httpx_mock.reset(True) - - async def test_cursor_fetchone( mock_query: Callable, mock_insert_query: Callable, @@ -708,130 +630,6 @@ async def test_cursor_skip_parse( split_format_sql_mock.assert_not_called() -async def test_cursor_server_side_async_execute( - httpx_mock: HTTPXMock, - server_side_async_id_callback: Callable, - server_side_async_id: Callable, - query_with_params_url: str, - cursor: Cursor, -): - """ - Cursor is able to execute query server-side asynchronously and - query_id is returned. - """ - for query, message in ( - ( - lambda: cursor.execute("select * from t", async_execution=True), - "server-side asynchronous execute()", - ), - ( - lambda: cursor.executemany( - "select * from t", parameters_seq=[], async_execution=True - ), - "server-side asynchronous executemany()", - ), - ): - # Query with json output - httpx_mock.add_callback( - server_side_async_id_callback, url=query_with_params_url - ) - - assert ( - await query() == server_side_async_id - ), f"Invalid query id returned for {message}." - assert ( - cursor.rowcount == -1 - ), f"Invalid rowcount value for insert using {message}." - assert ( - cursor.description is None - ), f"Invalid description for insert using {message}." - - -async def test_cursor_server_side_async_cancel( - httpx_mock: HTTPXMock, - server_side_async_cancel_callback: Callable, - server_side_async_id: Callable, - query_with_params_url: str, - cursor: Cursor, -): - """ - Cursor is able to cancel query server-side asynchronously and - query_id is returned. - """ - - # Query with json output - httpx_mock.add_callback( - server_side_async_cancel_callback, url=query_with_params_url - ) - cursor._set_parameters = {"invalid_parameter": "should_not_be_present"} - await cursor.cancel(server_side_async_id) - cursor.close() - with raises(CursorClosedError): - await cursor.cancel(server_side_async_id) - - -async def test_cursor_server_side_async_get_status_completed( - httpx_mock: HTTPXMock, - server_side_async_get_status_callback: Callable, - server_side_async_id: Callable, - query_with_params_url: str, - cursor: Cursor, -): - """ - Cursor is able to execute query server-side asynchronously and - query_id is returned. - """ - - # Query with json output - httpx_mock.add_callback( - server_side_async_get_status_callback, url=query_with_params_url - ) - status = await cursor.get_status(server_side_async_id) - assert status == QueryStatus.ENDED_SUCCESSFULLY - - -async def test_cursor_server_side_async_get_status_not_yet_available( - httpx_mock: HTTPXMock, - server_side_async_get_status_not_yet_availabe_callback: Callable, - server_side_async_id: Callable, - query_with_params_url: str, - cursor: Cursor, -): - """ - Cursor is able to execute query server-side asynchronously and - query_id is returned. - """ - - # Query with json output - httpx_mock.add_callback( - server_side_async_get_status_not_yet_availabe_callback, - url=query_with_params_url, - ) - status = await cursor.get_status(server_side_async_id) - assert status == QueryStatus.NOT_READY - - -async def test_cursor_server_side_async_get_status_error( - httpx_mock: HTTPXMock, - server_side_async_get_status_error: Callable, - server_side_async_id: Callable, - query_with_params_url: str, - cursor: Cursor, -): - """ """ - httpx_mock.add_callback( - server_side_async_get_status_error, url=query_with_params_url - ) - with raises(OperationalError) as excinfo: - await cursor.get_status(server_side_async_id) - - assert cursor._state == CursorState.ERROR - assert ( - str(excinfo.value) - == f"Asynchronous query {server_side_async_id} status check failed." - ), f"Invalid get_status error message." - - async def test_cursor_iterate( httpx_mock: HTTPXMock, query_callback: Callable, diff --git a/tests/unit/db/V1/test_cursor.py b/tests/unit/db/V1/test_cursor.py index 731c2a6202..c8ef8bbcb0 100644 --- a/tests/unit/db/V1/test_cursor.py +++ b/tests/unit/db/V1/test_cursor.py @@ -6,7 +6,7 @@ from pytest_httpx import HTTPXMock from firebolt.db import Cursor -from firebolt.db.cursor import ColType, Column, CursorState, QueryStatus +from firebolt.db.cursor import ColType, Column, CursorState from firebolt.utils.exception import ( ConfigurationError, CursorClosedError, @@ -560,140 +560,6 @@ def test_cursor_skip_parse( split_format_sql_mock.assert_not_called() -def test_cursor_server_side_async_execute( - httpx_mock: HTTPXMock, - auth_callback: Callable, - auth_url: str, - server_side_async_id_callback: Callable, - server_side_async_id: Callable, - query_with_params_url: str, - cursor: Cursor, -): - """ - Cursor is able to execute query server-side asynchronously and - query_id is returned. - """ - for query, message in ( - ( - lambda: cursor.execute("select * from t", async_execution=True), - "server-side asynchronous execute()", - ), - ( - lambda: cursor.executemany("select * from t", [], async_execution=True), - "server-side asynchronous executemany()", - ), - ): - - # Query with json output - httpx_mock.add_callback(auth_callback, url=auth_url) - httpx_mock.add_callback( - server_side_async_id_callback, url=query_with_params_url - ) - - assert ( - query() == server_side_async_id - ), f"Invalid query id returned for {message}." - assert ( - cursor.rowcount == -1 - ), f"Invalid rowcount value for insert using {message}." - assert ( - cursor.description is None - ), f"Invalid description for insert using {message}." - - -def test_cursor_server_side_async_cancel( - httpx_mock: HTTPXMock, - auth_callback: Callable, - auth_url: str, - server_side_async_cancel_callback: Callable, - server_side_async_id: Callable, - query_with_params_url: str, - cursor: Cursor, -): - """ - Cursor is able to cancel query server-side asynchronously and - query_id is returned. - """ - - # Query with json output - httpx_mock.add_callback(auth_callback, url=auth_url) - httpx_mock.add_callback( - server_side_async_cancel_callback, url=query_with_params_url - ) - cursor.cancel(server_side_async_id) - - -def test_cursor_server_side_async_get_status_completed( - httpx_mock: HTTPXMock, - auth_callback: Callable, - auth_url: str, - server_side_async_get_status_callback: Callable, - server_side_async_id: Callable, - query_with_params_url: str, - cursor: Cursor, -): - """ - Cursor is able to execute query server-side asynchronously and - query_id is returned. - """ - - # Query with json output - httpx_mock.add_callback(auth_callback, url=auth_url) - httpx_mock.add_callback( - server_side_async_get_status_callback, url=query_with_params_url - ) - status = cursor.get_status(server_side_async_id) - assert status == QueryStatus.ENDED_SUCCESSFULLY - - -def test_cursor_server_side_async_get_status_not_yet_available( - httpx_mock: HTTPXMock, - auth_callback: Callable, - auth_url: str, - server_side_async_get_status_not_yet_availabe_callback: Callable, - server_side_async_id: Callable, - query_with_params_url: str, - cursor: Cursor, -): - """ - Cursor is able to execute query server-side asynchronously and - query_id is returned. - """ - - # Query with json output - httpx_mock.add_callback(auth_callback, url=auth_url) - httpx_mock.add_callback( - server_side_async_get_status_not_yet_availabe_callback, - url=query_with_params_url, - ) - status = cursor.get_status(server_side_async_id) - assert status == QueryStatus.NOT_READY - - -def test_cursor_server_side_async_get_status_error( - httpx_mock: HTTPXMock, - auth_callback: Callable, - auth_url: str, - server_side_async_get_status_error: Callable, - server_side_async_id: Callable, - query_with_params_url: str, - cursor: Cursor, -): - """ """ - httpx_mock.add_callback(auth_callback, url=auth_url) - httpx_mock.add_callback( - server_side_async_get_status_error, url=query_with_params_url - ) - with raises(OperationalError) as excinfo: - cursor.get_status(server_side_async_id) - - assert cursor._state == CursorState.ERROR - assert ( - str(excinfo.value) - == f"Asynchronous query {server_side_async_id} status check failed." - ), f"Invalid get_status error message." - - def test_server_side_header_database( httpx_mock: HTTPXMock, auth_callback: Callable, diff --git a/tests/unit/db/V2/test_cursor.py b/tests/unit/db/V2/test_cursor.py index 3e26dbf08c..3be1e12e64 100644 --- a/tests/unit/db/V2/test_cursor.py +++ b/tests/unit/db/V2/test_cursor.py @@ -6,7 +6,7 @@ from pytest_httpx import HTTPXMock from firebolt.db import Cursor -from firebolt.db.cursor import ColType, Column, CursorState, QueryStatus +from firebolt.db.cursor import ColType, Column, CursorState from firebolt.utils.exception import ( ConfigurationError, CursorClosedError, @@ -613,130 +613,6 @@ def test_cursor_skip_parse( split_format_sql_mock.assert_not_called() -def test_cursor_server_side_async_execute( - httpx_mock: HTTPXMock, - server_side_async_id_callback: Callable, - server_side_async_id: Callable, - query_with_params_url: str, - cursor: Cursor, -): - """ - Cursor is able to execute query server-side asynchronously and - query_id is returned. - """ - for query, message in ( - ( - lambda: cursor.execute("select * from t", async_execution=True), - "server-side asynchronous execute()", - ), - ( - lambda: cursor.executemany("select * from t", [], async_execution=True), - "server-side asynchronous executemany()", - ), - ): - - # Query with json output - httpx_mock.add_callback( - server_side_async_id_callback, url=query_with_params_url - ) - - assert ( - query() == server_side_async_id - ), f"Invalid query id returned for {message}." - assert ( - cursor.rowcount == -1 - ), f"Invalid rowcount value for insert using {message}." - assert ( - cursor.description is None - ), f"Invalid description for insert using {message}." - - -def test_cursor_server_side_async_cancel( - httpx_mock: HTTPXMock, - server_side_async_cancel_callback: Callable, - server_side_async_id: Callable, - query_with_params_url: str, - cursor: Cursor, -): - """ - Cursor is able to cancel query server-side asynchronously and - query_id is returned. - """ - - # Query with json output - httpx_mock.add_callback( - server_side_async_cancel_callback, url=query_with_params_url - ) - cursor._set_parameters = {"invalid_parameter": "should_not_be_present"} - cursor.cancel(server_side_async_id) - - cursor.close() - with raises(CursorClosedError): - cursor.cancel(server_side_async_id) - - -def test_cursor_server_side_async_get_status_completed( - httpx_mock: HTTPXMock, - server_side_async_get_status_callback: Callable, - server_side_async_id: Callable, - query_with_params_url: str, - cursor: Cursor, -): - """ - Cursor is able to execute query server-side asynchronously and - query_id is returned. - """ - - # Query with json output - httpx_mock.add_callback( - server_side_async_get_status_callback, url=query_with_params_url - ) - status = cursor.get_status(server_side_async_id) - assert status == QueryStatus.ENDED_SUCCESSFULLY - - -def test_cursor_server_side_async_get_status_not_yet_available( - httpx_mock: HTTPXMock, - server_side_async_get_status_not_yet_availabe_callback: Callable, - server_side_async_id: Callable, - query_with_params_url: str, - cursor: Cursor, -): - """ - Cursor is able to execute query server-side asynchronously and - query_id is returned. - """ - - # Query with json output - httpx_mock.add_callback( - server_side_async_get_status_not_yet_availabe_callback, - url=query_with_params_url, - ) - status = cursor.get_status(server_side_async_id) - assert status == QueryStatus.NOT_READY - - -def test_cursor_server_side_async_get_status_error( - httpx_mock: HTTPXMock, - server_side_async_get_status_error: Callable, - server_side_async_id: Callable, - query_with_params_url: str, - cursor: Cursor, -): - """ """ - httpx_mock.add_callback( - server_side_async_get_status_error, url=query_with_params_url - ) - with raises(OperationalError) as excinfo: - cursor.get_status(server_side_async_id) - - assert cursor._state == CursorState.ERROR - assert ( - str(excinfo.value) - == f"Asynchronous query {server_side_async_id} status check failed." - ), f"Invalid get_status error message." - - def test_cursor_iterate( httpx_mock: HTTPXMock, query_callback: Callable, diff --git a/tests/unit/db_conftest.py b/tests/unit/db_conftest.py index 2afc6df9c6..eab3e1da29 100644 --- a/tests/unit/db_conftest.py +++ b/tests/unit/db_conftest.py @@ -113,29 +113,6 @@ def python_query_data() -> List[List[ColType]]: ] -@fixture -def server_side_async_id_callback(server_side_async_id) -> Response: - def do_query(request: Request, **kwargs) -> Response: - query_response = {"query_id": server_side_async_id} - return Response(status_code=codes.OK, json=query_response) - - return do_query - - -@fixture -def server_side_async_missing_id_callback(server_side_async_id) -> Response: - def do_query(request: Request, **kwargs) -> Response: - query_response = {"no_id": server_side_async_id} - return Response(status_code=codes.OK, json=query_response) - - return do_query - - -@fixture -def server_side_async_id() -> str: - return "1a3f53d" - - @fixture def query_statistics() -> Dict[str, Any]: # Just some dummy statistics to have in query response @@ -150,103 +127,6 @@ def query_statistics() -> Dict[str, Any]: } -@fixture -def server_side_async_cancel_callback( - server_side_async_id, query_statistics: Dict[str, Any] -) -> Response: - def do_query(request: Request, **kwargs) -> Response: - # Make sure no set parameters are added - assert sorted(list(request.url.params.keys())) == [ - "database", - "query_id", - ], "invalid query params for async cancel" - assert request.url.path == "/cancel" - # Cancel has no body - assert request.read() == b"" - query_response = { - "meta": [ - {"name": "host", "type": "String"}, - {"name": "port", "type": "UInt16"}, - {"name": "status", "type": "Int64"}, - {"name": "error", "type": "String"}, - {"name": "num_hosts_remaining", "type": "UInt64"}, - {"name": "num_hosts_active", "type": "UInt64"}, - ], - "data": [["node1.node.consul", 9000, 0, "", 0, 0]], - "rows": 1, - "statistics": query_statistics, - } - return Response(status_code=codes.OK, json=query_response) - - return do_query - - -@fixture -def server_side_async_cancel_callback_error(server_side_async_id) -> Response: - def do_query(request: Request, **kwargs) -> Response: - return Response(status_code=codes.BAD_REQUEST, json={}) - - return do_query - - -@fixture -def server_side_async_get_status_callback(server_side_async_id) -> Response: - def do_query(request: Request, **kwargs) -> Response: - query_response = { - "engine_name": "engine", - "query_id": server_side_async_id, - "status": "ENDED_SUCCESSFULLY", - "query_start_time": "2020-07-31 01:01:01.1234", - "query_duration_ms": 0.104614307, - "original_query": "SELECT 1", - } - return Response(status_code=codes.OK, json=query_response) - - return do_query - - -@fixture -def server_side_async_get_status_not_yet_availabe_callback( - server_side_async_id, -) -> Response: - def do_query(request: Request, **kwargs) -> Response: - query_response = { - "engine_name": "", - "query_id": "", - "status": "", - "query_start_time": "", - "query_duration_ms": "", - "original_query": "", - } - return Response(status_code=codes.OK, json=query_response) - - return do_query - - -@fixture -def server_side_async_get_status_error(server_side_async_id) -> Response: - def do_query(request: Request, **kwargs) -> Response: - return Response(status_code=codes.OK, json="") - - return do_query - - -@fixture -def server_side_async_missing_query_id_error(server_side_async_id) -> Response: - def do_query(request: Request, **kwargs) -> Response: - query_response = { - "engine_name": "engine", - "query_id": "", - "status": "ENDED_SUCCESSFULLY", - "query_start_time": "2020-07-31 01:01:01.1234", - "query_duration_ms": 0.104614307, - "original_query": "SELECT 1", - } - return Response(status_code=codes.OK, json=query_response) - - return do_query - - @fixture def query_callback( query_description: List[Column],