diff --git a/src/firebolt/common/base_cursor.py b/src/firebolt/common/base_cursor.py index b0925bfc001..7b278595a03 100644 --- a/src/firebolt/common/base_cursor.py +++ b/src/firebolt/common/base_cursor.py @@ -103,7 +103,6 @@ class BaseCursor: "_rowcount", "_rows", "_idx", - "_idx_lock", "_row_sets", "_next_set_idx", "_set_parameters", diff --git a/src/firebolt/db/connection.py b/src/firebolt/db/connection.py index adb8bf46e71..5cf64f6d410 100644 --- a/src/firebolt/db/connection.py +++ b/src/firebolt/db/connection.py @@ -139,7 +139,9 @@ def __init__( def cursor(self, **kwargs: Any) -> Cursor: if self.closed: - raise ConnectionClosedError("Unable to create cursor: connection closed.") + raise ConnectionClosedError( + "Unable to create cursor: connection closed." # pragma: no mutate + ) with self._closing_lock.gen_rlock(): c = Cursor(client=self._client, connection=self, **kwargs) @@ -168,7 +170,9 @@ def close(self) -> None: # Context manager support def __enter__(self) -> Connection: if self.closed: - raise ConnectionClosedError("Connection is already closed.") + raise ConnectionClosedError( + "Connection is already closed." # pragma: no mutate + ) return self def __exit__( diff --git a/src/firebolt/db/cursor.py b/src/firebolt/db/cursor.py index c516d1e8e25..87e3ff960d8 100644 --- a/src/firebolt/db/cursor.py +++ b/src/firebolt/db/cursor.py @@ -3,7 +3,6 @@ import logging import re import time -from threading import Lock from typing import ( TYPE_CHECKING, Any, @@ -16,7 +15,6 @@ ) from httpx import Response, codes -from readerwriterlock.rwlock import RWLockWrite from firebolt.client import Client from firebolt.common._types import ( @@ -65,17 +63,10 @@ class Cursor(BaseCursor): with the :py:func:`fetchmany` method """ - __slots__ = BaseCursor.__slots__ + ( - "_query_lock", - "_idx_lock", - ) - def __init__( self, *args: Any, client: Client, connection: Connection, **kwargs: Any ) -> None: super().__init__(*args, **kwargs) - self._query_lock = RWLockWrite() - self._idx_lock = Lock() self._client = client self.connection = connection @@ -100,7 +91,7 @@ def _raise_if_error(self, resp: Response) -> None: if not is_engine_running(self.connection, self.connection.engine_url): raise EngineNotRunningError( f"Firebolt engine {self.connection.engine_url} " - "needs to be running to run queries against it." + "needs to be running to run queries against it." # pragma: no mutate # noqa: E501 ) resp.raise_for_status() diff --git a/tests/unit/async_db/test_connection.py b/tests/unit/async_db/test_connection.py index ac63f1f4aa3..0f704597909 100644 --- a/tests/unit/async_db/test_connection.py +++ b/tests/unit/async_db/test_connection.py @@ -18,6 +18,13 @@ from firebolt.utils.token_storage import TokenSecureStorage +@mark.skip("__slots__ is broken on Connection class") +async def test_connection_attributes(connection: Connection) -> None: + """Test that no unexpected values can be set. Governed by __slots__""" + with raises(AttributeError): + connection.not_a_database = "dummy" + + async def test_closed_connection(connection: Connection) -> None: """Connection methods are unavailable for closed connection.""" await connection.aclose() @@ -34,13 +41,14 @@ async def test_closed_connection(connection: Connection) -> None: async def test_cursors_closed_on_close(connection: Connection) -> None: """Connection closes all its cursors on close.""" + assert connection.closed == False, "Initial state of connection is incorrect" c1, c2 = connection.cursor(), connection.cursor() assert ( len(connection._cursors) == 2 ), "Invalid number of cursors stored in connection." await connection.aclose() - assert connection.closed, "Connection was not closed on close." + assert connection.closed == True, "Connection was not closed on close." assert c1.closed, "Cursor was not closed on connection close." assert c2.closed, "Cursor was not closed on connection close." assert len(connection._cursors) == 0, "Cursors left in connection after close." @@ -197,6 +205,21 @@ async def test_connection_token_caching( mock_connection_flow() mock_query() + # Using caching by default + with Patcher(): + async with await connect( + database=db_name, + auth=ClientCredentials(client_id, client_secret, use_token_cache=True), + engine_name=engine_name, + account_name=account_name, + api_endpoint=server, + ) as connection: + assert await connection.cursor().execute("select*") == len( + python_query_data + ) + ts = TokenSecureStorage(username=client_id, password=client_secret) + assert ts.get_cached_token() == access_token, "Invalid token value cached" + with Patcher(): async with await connect( database=db_name, diff --git a/tests/unit/async_db/test_cursor.py b/tests/unit/async_db/test_cursor.py index c9e02e69a64..95cdb0b6b1c 100644 --- a/tests/unit/async_db/test_cursor.py +++ b/tests/unit/async_db/test_cursor.py @@ -1,4 +1,4 @@ -from typing import Callable, Dict, List +from typing import Any, Callable, Dict, List from unittest.mock import patch from httpx import HTTPStatusError, StreamError, codes @@ -8,12 +8,16 @@ from firebolt.async_db import Cursor from firebolt.common._types import Column from firebolt.common.base_cursor import ColType, CursorState, QueryStatus +from firebolt.common.settings import Settings from firebolt.utils.exception import ( AsyncExecutionUnavailableError, CursorClosedError, DataError, EngineNotRunningError, + FireboltDatabaseError, + FireboltEngineError, OperationalError, + ProgrammingError, QueryNotRunError, ) from tests.unit.db_conftest import encode_param @@ -189,8 +193,11 @@ async def test_cursor_execute( async def test_cursor_execute_error( httpx_mock: HTTPXMock, query_url: str, + get_engines_url: str, + settings: Settings, + db_name: str, + query_statistics: Dict[str, Any], cursor: Cursor, - get_engine_url_not_running_callback: Callable, system_engine_query_url: str, ): """Cursor handles all types of errors properly.""" @@ -240,19 +247,95 @@ def http_error(*args, **kwargs): str(excinfo.value) == "Error executing query:\nQuery error message" ), f"Invalid authentication error message for {message}." + # Database does not exist error + httpx_mock.add_response( + status_code=codes.FORBIDDEN, + content="Query error message", + url=query_url, + match_content=b"select * from t", + ) + httpx_mock.add_response( + url=system_engine_query_url, + method="POST", + json={ + "rows": "0", + "data": [], + "meta": [], + "statistics": query_statistics, + }, + ) + with raises(FireboltDatabaseError) as excinfo: + await query() + assert cursor._state == CursorState.ERROR + assert db_name in str(excinfo) + + # Database exists but some other error + error_message = "My query error message" + httpx_mock.add_response( + status_code=codes.FORBIDDEN, + content=error_message, + url=query_url, + match_content=b"select * from t", + ) + httpx_mock.add_response( + url=system_engine_query_url, + method="POST", + json={ + "rows": "1", + "data": ["my_db"], + "meta": [], + "statistics": query_statistics, + }, + ) + with raises(ProgrammingError) as excinfo: + await query() + assert cursor._state == CursorState.ERROR + assert error_message in str(excinfo) + # Engine is not running error httpx_mock.add_response( status_code=codes.SERVICE_UNAVAILABLE, content="Query error message", url=query_url, ) - httpx_mock.add_callback( - get_engine_url_not_running_callback, + httpx_mock.add_response( url=system_engine_query_url, + method="POST", + json={ + "rows": "1", + "data": [[get_engines_url, "my_db", "Stopped"]], + "meta": [ + {"name": "url", "type": "text"}, + {"name": "attached_to", "type": "text"}, + {"name": "status", "type": "text"}, + ], + "statistics": query_statistics, + }, ) with raises(EngineNotRunningError) as excinfo: await query() assert cursor._state == CursorState.ERROR + assert settings.server in str(excinfo) + + # Engine does not exist + httpx_mock.add_response( + status_code=codes.SERVICE_UNAVAILABLE, + content="Query error message", + url=query_url, + ) + httpx_mock.add_response( + url=system_engine_query_url, + method="POST", + json={ + "rows": "0", + "data": [], + "meta": [], + "statistics": query_statistics, + }, + ) + with raises(FireboltEngineError) as excinfo: + await query() + assert cursor._state == CursorState.ERROR httpx_mock.reset(True) @@ -487,11 +570,11 @@ async def test_cursor_multi_statement( for i, (desc, exp) in enumerate(zip(cursor.description, python_query_description)): assert desc == exp, f"Invalid column description at position {i}" - assert cursor.statistics.elapsed == 0.002983335 - assert cursor.statistics.time_before_execution == 0.002729331 - assert cursor.statistics.time_to_execute == 0.000215215 + assert cursor.statistics.elapsed == 0.116907717 + assert cursor.statistics.time_before_execution == 0.012180623 + assert cursor.statistics.time_to_execute == 0.104614307 assert cursor.statistics.rows_read == 1 - assert cursor.statistics.bytes_read == 1 + assert cursor.statistics.bytes_read == 61 assert cursor.statistics.scanned_bytes_cache == 0 assert cursor.statistics.scanned_bytes_storage == 0 @@ -664,7 +747,11 @@ async def test_cursor_server_side_async_cancel( httpx_mock.add_callback( server_side_async_cancel_callback, url=query_with_params_url ) + cursor._set_parameters = {"invalid_parameter": "should_not_be_present"} await cursor.cancel(server_side_async_id) + cursor.close() + with raises(CursorClosedError): + await cursor.cancel(server_side_async_id) async def test_cursor_server_side_async_get_status_completed( @@ -727,3 +814,31 @@ async def test_cursor_server_side_async_get_status_error( str(excinfo.value) == f"Asynchronous query {server_side_async_id} status check failed." ), f"Invalid get_status error message." + + +async def test_cursor_iterate( + httpx_mock: HTTPXMock, + query_callback: Callable, + query_url: str, + cursor: Cursor, + python_query_data: List[List[ColType]], +): + """Cursor is able to execute query, all fields are populated properly.""" + + httpx_mock.add_callback(query_callback, url=query_url) + + with raises(QueryNotRunError): + async for res in cursor: + pass + + await cursor.execute("select * from t") + i = 0 + async for res in cursor: + assert res in python_query_data + i += 1 + assert i == len(python_query_data), "Wrong number iterations of a cursor were done" + + cursor.close() + with raises(CursorClosedError): + async for res in cursor: + pass diff --git a/tests/unit/db/conftest.py b/tests/unit/db/conftest.py index 85b3a1d6082..ec8d299f9f8 100644 --- a/tests/unit/db/conftest.py +++ b/tests/unit/db/conftest.py @@ -26,6 +26,24 @@ def connection( yield connection +@fixture +def system_connection( + server: str, + db_name: str, + auth: Auth, + account_name: str, + mock_system_connection_flow: Callable, +) -> Connection: + mock_system_connection_flow() + with connect( + database=db_name, + auth=auth, + account_name=account_name, + api_endpoint=server, + ) as connection: + yield connection + + @fixture def cursor(connection: Connection) -> Cursor: return connection.cursor() diff --git a/tests/unit/db/test_connection.py b/tests/unit/db/test_connection.py index 89ef9e741cb..1399922558f 100644 --- a/tests/unit/db/test_connection.py +++ b/tests/unit/db/test_connection.py @@ -20,6 +20,14 @@ from firebolt.utils.token_storage import TokenSecureStorage +def test_connection_attributes(connection: Connection) -> None: + with raises(AttributeError): + connection.not_a_cursor() + + with raises(AttributeError): + connection.not_a_database + + def test_closed_connection(connection: Connection) -> None: """Connection methods are unavailable for closed connection.""" connection.close() @@ -36,13 +44,14 @@ def test_closed_connection(connection: Connection) -> None: def test_cursors_closed_on_close(connection: Connection) -> None: """Connection closes all its cursors on close.""" + assert connection.closed == False, "Initial state of connection is incorrect" c1, c2 = connection.cursor(), connection.cursor() assert ( len(connection._cursors) == 2 ), "Invalid number of cursors stored in connection" connection.close() - assert connection.closed, "Connection was not closed on close" + assert connection.closed == True, "Connection was not closed on close" assert c1.closed, "Cursor was not closed on connection close" assert c2.closed, "Cursor was not closed on connection close" assert len(connection._cursors) == 0, "Cursors left in connection after close" @@ -177,8 +186,8 @@ def test_connect_database( def test_connection_unclosed_warnings(auth: Auth): c = Connection("", "", auth, "", None) with warns(UserWarning) as winfo: - del c - gc.collect() + # Can't guarantee `del c` triggers garbage collection + c.__del__() assert any( "Unclosed" in str(warning.message) for warning in winfo.list @@ -219,6 +228,19 @@ def test_connection_token_caching( mock_connection_flow() mock_query() + # Using caching by default + with Patcher(): + with connect( + database=db_name, + auth=ClientCredentials(client_id, client_secret), + engine_name=engine_name, + account_name=account_name, + api_endpoint=server, + ) as connection: + assert connection.cursor().execute("select*") == len(python_query_data) + ts = TokenSecureStorage(username=client_id, password=client_secret) + assert ts.get_cached_token() == access_token, "Invalid token value cached" + with Patcher(): with connect( database=db_name, diff --git a/tests/unit/db/test_cursor.py b/tests/unit/db/test_cursor.py index 04f9082012b..d61bc23a531 100644 --- a/tests/unit/db/test_cursor.py +++ b/tests/unit/db/test_cursor.py @@ -1,16 +1,21 @@ -from typing import Callable, Dict, List +from typing import Any, Callable, Dict, List from unittest.mock import patch from httpx import HTTPStatusError, StreamError, codes from pytest import raises from pytest_httpx import HTTPXMock +from firebolt.common.settings import Settings from firebolt.db import Cursor from firebolt.db.cursor import ColType, Column, CursorState, QueryStatus from firebolt.utils.exception import ( CursorClosedError, DataError, + EngineNotRunningError, + FireboltDatabaseError, + FireboltEngineError, OperationalError, + ProgrammingError, QueryNotRunError, ) from tests.unit.db_conftest import encode_param @@ -182,8 +187,13 @@ def test_cursor_execute( def test_cursor_execute_error( httpx_mock: HTTPXMock, + get_engines_url: str, + settings: Settings, + db_name: str, query_url: str, + query_statistics: Dict[str, Any], cursor: Cursor, + system_engine_query_url: str, ): """Cursor handles all types of errors properly.""" for query, message in ( @@ -231,6 +241,96 @@ def http_error(*args, **kwargs): assert ( str(excinfo.value) == "Error executing query:\nQuery error message" ), f"Invalid authentication error message for {message}." + + # Database does not exist error + httpx_mock.add_response( + status_code=codes.FORBIDDEN, + content="Query error message", + url=query_url, + ) + httpx_mock.add_response( + url=system_engine_query_url, + method="POST", + json={ + "rows": "0", + "data": [], + "meta": [], + "statistics": query_statistics, + }, + ) + with raises(FireboltDatabaseError) as excinfo: + query() + assert cursor._state == CursorState.ERROR + assert db_name in str(excinfo) + + # Database exists but some other error + error_message = "My query error message" + httpx_mock.add_response( + status_code=codes.FORBIDDEN, + content=error_message, + url=query_url, + match_content=b"select * from t", + ) + httpx_mock.add_response( + url=system_engine_query_url, + method="POST", + json={ + "rows": "1", + "data": ["my_db"], + "meta": [], + "statistics": query_statistics, + }, + ) + with raises(ProgrammingError) as excinfo: + query() + assert cursor._state == CursorState.ERROR + assert error_message in str(excinfo) + + # Engine is not running error + httpx_mock.add_response( + status_code=codes.SERVICE_UNAVAILABLE, + content="Query error message", + url=query_url, + ) + httpx_mock.add_response( + url=system_engine_query_url, + method="POST", + json={ + "rows": "1", + "data": [[get_engines_url, "my_db", "Stopped"]], + "meta": [ + {"name": "url", "type": "text"}, + {"name": "attached_to", "type": "text"}, + {"name": "status", "type": "text"}, + ], + "statistics": query_statistics, + }, + ) + with raises(EngineNotRunningError) as excinfo: + query() + assert cursor._state == CursorState.ERROR + assert settings.server in str(excinfo) + + # Engine does not exist + httpx_mock.add_response( + status_code=codes.SERVICE_UNAVAILABLE, + content="Query error message", + url=query_url, + ) + httpx_mock.add_response( + url=system_engine_query_url, + method="POST", + json={ + "rows": "0", + "data": [], + "meta": [], + "statistics": query_statistics, + }, + ) + with raises(FireboltEngineError) as excinfo: + query() + assert cursor._state == CursorState.ERROR + httpx_mock.reset(True) @@ -551,8 +651,13 @@ def test_cursor_server_side_async_cancel( httpx_mock.add_callback( server_side_async_cancel_callback, url=query_with_params_url ) + cursor._set_parameters = {"invalid_parameter": "should_not_be_present"} cursor.cancel(server_side_async_id) + cursor.close() + with raises(CursorClosedError): + cursor.cancel(server_side_async_id) + def test_cursor_server_side_async_get_status_completed( httpx_mock: HTTPXMock, @@ -614,3 +719,31 @@ def test_cursor_server_side_async_get_status_error( str(excinfo.value) == f"Asynchronous query {server_side_async_id} status check failed." ), f"Invalid get_status error message." + + +def test_cursor_iterate( + httpx_mock: HTTPXMock, + query_callback: Callable, + query_url: str, + cursor: Cursor, + python_query_data: List[List[ColType]], +): + """Cursor is able to execute query, all fields are populated properly.""" + + httpx_mock.add_callback(query_callback, url=query_url) + + with raises(QueryNotRunError): + for res in cursor: + pass + + cursor.execute("select * from t") + i = 0 + for res in cursor: + assert res == python_query_data[i] + i += 1 + assert i == len(python_query_data), "Wrong number iterations of a cursor were done" + + cursor.close() + with raises(CursorClosedError): + for res in cursor: + pass diff --git a/tests/unit/db/test_util.py b/tests/unit/db/test_util.py new file mode 100644 index 00000000000..a21ac69b73a --- /dev/null +++ b/tests/unit/db/test_util.py @@ -0,0 +1,99 @@ +from typing import Any, Dict + +from pytest_httpx import HTTPXMock + +from firebolt.db.connection import Connection +from firebolt.db.util import is_db_available, is_engine_running + + +def test_is_db_available( + connection: Connection, + httpx_mock: HTTPXMock, + query_statistics: Dict[str, Any], + system_engine_query_url: str, +): + httpx_mock.add_response( + url=system_engine_query_url, + method="POST", + json={ + "rows": "1", + "data": ["my_db"], + "meta": [], + "statistics": query_statistics, + }, + ) + assert is_db_available(connection, "dummy") == True + + +def test_is_db_not_available( + connection: Connection, + httpx_mock: HTTPXMock, + system_engine_query_url: str, + query_statistics: Dict[str, Any], +): + httpx_mock.add_response( + url=system_engine_query_url, + method="POST", + json={ + "rows": "0", + "data": [], + "meta": [], + "statistics": query_statistics, + }, + ) + assert is_db_available(connection, "dummy") == False + + +def test_is_engine_running_system( + system_connection: Connection, +): + # System engine is always running + assert is_engine_running(system_connection, "dummy") == True + + +def test_is_engine_running( + connection: Connection, + httpx_mock: HTTPXMock, + system_engine_query_url: str, + query_statistics: Dict[str, Any], + get_engines_url: str, +): + httpx_mock.add_response( + url=system_engine_query_url, + method="POST", + json={ + "rows": "1", + "data": [[get_engines_url, "my_db", "Running"]], + "meta": [ + {"name": "url", "type": "text"}, + {"name": "attached_to", "type": "text"}, + {"name": "status", "type": "text"}, + ], + "statistics": query_statistics, + }, + ) + assert is_engine_running(connection, get_engines_url) == True + + +def test_is_engine_not_running( + connection: Connection, + httpx_mock: HTTPXMock, + system_engine_query_url: str, + query_statistics: Dict[str, Any], + get_engines_url: str, +): + httpx_mock.add_response( + url=system_engine_query_url, + method="POST", + json={ + "rows": "1", + "data": [[get_engines_url, "my_db", "Stopped"]], + "meta": [ + {"name": "url", "type": "text"}, + {"name": "attached_to", "type": "text"}, + {"name": "status", "type": "text"}, + ], + "statistics": query_statistics, + }, + ) + assert is_engine_running(connection, get_engines_url) == False diff --git a/tests/unit/db_conftest.py b/tests/unit/db_conftest.py index c5a740ac0d9..5ed5a951758 100644 --- a/tests/unit/db_conftest.py +++ b/tests/unit/db_conftest.py @@ -137,8 +137,32 @@ def server_side_async_id() -> str: @fixture -def server_side_async_cancel_callback(server_side_async_id) -> Response: +def query_statistics() -> Dict[str, Any]: + # Just some dummy statistics to have in query response + return { + "elapsed": 0.116907717, + "rows_read": 1, + "bytes_read": 61, + "time_before_execution": 0.012180623, + "time_to_execute": 0.104614307, + "scanned_bytes_cache": 0, + "scanned_bytes_storage": 0, + } + + +@fixture +def server_side_async_cancel_callback( + server_side_async_id, query_statistics: Dict[str, Any] +) -> Response: def do_query(request: Request, **kwargs) -> Response: + # Make sure no set parameters are added + assert sorted(list(request.url.params.keys())) == [ + "database", + "query_id", + ], "invalid query params for async cancel" + assert request.url.path == "/cancel" + # Cancel has no body + assert request.read() == b"" query_response = { "meta": [ {"name": "host", "type": "String"}, @@ -150,15 +174,7 @@ def do_query(request: Request, **kwargs) -> Response: ], "data": [["node1.node.consul", 9000, 0, "", 0, 0]], "rows": 1, - "statistics": { - "elapsed": 0.116907717, - "rows_read": 1, - "bytes_read": 61, - "time_before_execution": 0.012180623, - "time_to_execute": 0.104614307, - "scanned_bytes_cache": 0, - "scanned_bytes_storage": 0, - }, + "statistics": query_statistics, } return Response(status_code=codes.OK, json=query_response) @@ -233,22 +249,19 @@ def do_query(request: Request, **kwargs) -> Response: @fixture def query_callback( - query_description: List[Column], query_data: List[List[ColType]] + query_description: List[Column], + query_data: List[List[ColType]], + query_statistics: Dict[str, Any], ) -> Callable: def do_query(request: Request, **kwargs) -> Response: + assert request.read() != b"" + assert request.method == "POST" + assert f"output_format={JSON_OUTPUT_FORMAT}" in str(request.url) query_response = { "meta": [{"name": c.name, "type": c.type_code} for c in query_description], "data": query_data, "rows": len(query_data), - "statistics": { - "elapsed": 0.002983335, - "time_before_execution": 0.002729331, - "time_to_execute": 0.000215215, - "rows_read": 1, - "bytes_read": 1, - "scanned_bytes_cache": 0, - "scanned_bytes_storage": 0, - }, + "statistics": query_statistics, } return Response(status_code=codes.OK, json=query_response) @@ -285,6 +298,7 @@ def query_with_params_callback( query_description: List[Column], query_data: List[List[ColType]], set_params: Dict, + query_statistics: Dict[str, Any], ) -> Callable: def do_query(request: Request, **kwargs) -> Response: set_parameters = request.url.params @@ -296,16 +310,7 @@ def do_query(request: Request, **kwargs) -> Response: "meta": [{"name": c.name, "type": c.type_code} for c in query_description], "data": query_data, "rows": len(query_data), - # Real example of statistics field value, not used by our code - "statistics": { - "elapsed": 0.002983335, - "time_before_execution": 0.002729331, - "time_to_execute": 0.000215215, - "rows_read": 1, - "bytes_read": 1, - "scanned_bytes_cache": 0, - "scanned_bytes_storage": 0, - }, + "statistics": query_statistics, } return Response(status_code=codes.OK, json=query_response) @@ -350,7 +355,9 @@ def query_with_params_url(query_url: str, set_params: str) -> str: query_url = f"{query_url}&{params_encoded}" -def _get_engine_url_callback(server: str, db_name: str, status="Running") -> Callable: +def _get_engine_url_callback( + server: str, db_name: str, query_statistics: Dict[str, Any], status="Running" +) -> Callable: def do_query(request: Request, **kwargs) -> Response: set_parameters = request.url.params assert ( @@ -364,16 +371,7 @@ def do_query(request: Request, **kwargs) -> Response: "meta": [{"name": "name", "type": "Text"} for _ in range(len(data[0]))], "data": data, "rows": len(data), - # Real example of statistics field value, not used by our code - "statistics": { - "elapsed": 0.002983335, - "time_before_execution": 0.002729331, - "time_to_execute": 0.000215215, - "rows_read": 1, - "bytes_read": 1, - "scanned_bytes_cache": 0, - "scanned_bytes_storage": 0, - }, + "statistics": query_statistics, } return Response(status_code=codes.OK, json=query_response) @@ -381,21 +379,31 @@ def do_query(request: Request, **kwargs) -> Response: @fixture -def get_engine_url_callback(server: str, db_name: str, status="Running") -> Callable: - return _get_engine_url_callback(server, db_name) +def get_engine_url_callback( + server: str, db_name: str, query_statistics: Dict[str, Any], status="Running" +) -> Callable: + return _get_engine_url_callback(server, db_name, query_statistics) @fixture -def get_engine_url_not_running_callback(engine_name, db_name) -> Callable: - return _get_engine_url_callback(engine_name, db_name, "Stopped") +def get_engine_url_not_running_callback( + engine_name, db_name, query_statistics: Dict[str, Any] +) -> Callable: + return _get_engine_url_callback(engine_name, db_name, query_statistics, "Stopped") @fixture -def get_engine_url_invalid_db_callback(engine_name, db_name) -> Callable: - return _get_engine_url_callback(engine_name, "not_" + db_name) +def get_engine_url_invalid_db_callback( + engine_name, + db_name, + query_statistics: Dict[str, Any], +) -> Callable: + return _get_engine_url_callback(engine_name, "not_" + db_name, query_statistics) -def _get_default_db_engine_callback(server: str, status="Running") -> Callable: +def _get_default_db_engine_callback( + server: str, query_statistics: Dict[str, Any], status="Running" +) -> Callable: def do_query(request: Request, **kwargs) -> Response: set_parameters = request.url.params assert len(set_parameters) == 1 and "output_format" in set_parameters @@ -405,15 +413,7 @@ def do_query(request: Request, **kwargs) -> Response: "data": data, "rows": len(data), # Real example of statistics field value, not used by our code - "statistics": { - "elapsed": 0.002983335, - "time_before_execution": 0.002729331, - "time_to_execute": 0.000215215, - "rows_read": 1, - "bytes_read": 1, - "scanned_bytes_cache": 0, - "scanned_bytes_storage": 0, - }, + "statistics": query_statistics, } return Response(status_code=codes.OK, json=query_response) @@ -493,6 +493,21 @@ def inner() -> None: return inner +@fixture +def mock_system_connection_flow( + httpx_mock: HTTPXMock, + auth_url: str, + check_credentials_callback: Callable, + get_system_engine_url: str, + get_system_engine_callback: Callable, +) -> Callable: + def inner() -> None: + httpx_mock.add_callback(check_credentials_callback, url=auth_url) + httpx_mock.add_callback(get_system_engine_callback, url=get_system_engine_url) + + return inner + + @fixture def mock_query( httpx_mock: HTTPXMock,