From 301cc38152a35dea6f4d28b1200a9322b5da535b Mon Sep 17 00:00:00 2001 From: ptiurin Date: Tue, 30 May 2023 18:40:16 +0100 Subject: [PATCH 1/5] Additional tests --- src/firebolt/common/base_cursor.py | 1 - src/firebolt/db/connection.py | 8 ++- src/firebolt/db/cursor.py | 10 +-- tests/unit/async_db/test_connection.py | 29 +++++++- tests/unit/async_db/test_cursor.py | 74 ++++++++++++++++++++ tests/unit/db/test_connection.py | 31 ++++++++- tests/unit/db/test_cursor.py | 96 ++++++++++++++++++++++++++ tests/unit/db/test_util.py | 72 +++++++++++++++++++ tests/unit/db_conftest.py | 11 +++ 9 files changed, 316 insertions(+), 16 deletions(-) create mode 100644 tests/unit/db/test_util.py diff --git a/src/firebolt/common/base_cursor.py b/src/firebolt/common/base_cursor.py index b0925bfc001..7b278595a03 100644 --- a/src/firebolt/common/base_cursor.py +++ b/src/firebolt/common/base_cursor.py @@ -103,7 +103,6 @@ class BaseCursor: "_rowcount", "_rows", "_idx", - "_idx_lock", "_row_sets", "_next_set_idx", "_set_parameters", diff --git a/src/firebolt/db/connection.py b/src/firebolt/db/connection.py index adb8bf46e71..5cf64f6d410 100644 --- a/src/firebolt/db/connection.py +++ b/src/firebolt/db/connection.py @@ -139,7 +139,9 @@ def __init__( def cursor(self, **kwargs: Any) -> Cursor: if self.closed: - raise ConnectionClosedError("Unable to create cursor: connection closed.") + raise ConnectionClosedError( + "Unable to create cursor: connection closed." # pragma: no mutate + ) with self._closing_lock.gen_rlock(): c = Cursor(client=self._client, connection=self, **kwargs) @@ -168,7 +170,9 @@ def close(self) -> None: # Context manager support def __enter__(self) -> Connection: if self.closed: - raise ConnectionClosedError("Connection is already closed.") + raise ConnectionClosedError( + "Connection is already closed." # pragma: no mutate + ) return self def __exit__( diff --git a/src/firebolt/db/cursor.py b/src/firebolt/db/cursor.py index c516d1e8e25..1ffdd7f91c8 100644 --- a/src/firebolt/db/cursor.py +++ b/src/firebolt/db/cursor.py @@ -16,7 +16,6 @@ ) from httpx import Response, codes -from readerwriterlock.rwlock import RWLockWrite from firebolt.client import Client from firebolt.common._types import ( @@ -65,17 +64,10 @@ class Cursor(BaseCursor): with the :py:func:`fetchmany` method """ - __slots__ = BaseCursor.__slots__ + ( - "_query_lock", - "_idx_lock", - ) - def __init__( self, *args: Any, client: Client, connection: Connection, **kwargs: Any ) -> None: super().__init__(*args, **kwargs) - self._query_lock = RWLockWrite() - self._idx_lock = Lock() self._client = client self.connection = connection @@ -100,7 +92,7 @@ def _raise_if_error(self, resp: Response) -> None: if not is_engine_running(self.connection, self.connection.engine_url): raise EngineNotRunningError( f"Firebolt engine {self.connection.engine_url} " - "needs to be running to run queries against it." + "needs to be running to run queries against it." # pragma: no mutate # noqa: E501 ) resp.raise_for_status() diff --git a/tests/unit/async_db/test_connection.py b/tests/unit/async_db/test_connection.py index ac63f1f4aa3..9b741741227 100644 --- a/tests/unit/async_db/test_connection.py +++ b/tests/unit/async_db/test_connection.py @@ -18,6 +18,14 @@ from firebolt.utils.token_storage import TokenSecureStorage +async def test_connection_attributes(connection: Connection) -> None: + with raises(AttributeError): + await connection.not_a_cursor() + + with raises(AttributeError): + connection.not_a_database + + async def test_closed_connection(connection: Connection) -> None: """Connection methods are unavailable for closed connection.""" await connection.aclose() @@ -34,13 +42,14 @@ async def test_closed_connection(connection: Connection) -> None: async def test_cursors_closed_on_close(connection: Connection) -> None: """Connection closes all its cursors on close.""" + assert connection.closed == False, "Initial state of connection is incorrect" c1, c2 = connection.cursor(), connection.cursor() assert ( len(connection._cursors) == 2 ), "Invalid number of cursors stored in connection." await connection.aclose() - assert connection.closed, "Connection was not closed on close." + assert connection.closed == True, "Connection was not closed on close." assert c1.closed, "Cursor was not closed on connection close." assert c2.closed, "Cursor was not closed on connection close." assert len(connection._cursors) == 0, "Cursors left in connection after close." @@ -197,6 +206,24 @@ async def test_connection_token_caching( mock_connection_flow() mock_query() + # Using caching by default + with Patcher(): + async with await connect( + database=db_name, + username=settings.user, + password=settings.password.get_secret_value(), + engine_url=settings.server, + account_name=settings.account_name, + api_endpoint=settings.server, + ) as connection: + assert await connection.cursor().execute("select*") == len( + python_query_data + ) + ts = TokenSecureStorage( + username=settings.user, password=settings.password.get_secret_value() + ) + assert ts.get_cached_token() == access_token, "Invalid token value cached" + with Patcher(): async with await connect( database=db_name, diff --git a/tests/unit/async_db/test_cursor.py b/tests/unit/async_db/test_cursor.py index c9e02e69a64..fbd99f711a3 100644 --- a/tests/unit/async_db/test_cursor.py +++ b/tests/unit/async_db/test_cursor.py @@ -8,12 +8,15 @@ from firebolt.async_db import Cursor from firebolt.common._types import Column from firebolt.common.base_cursor import ColType, CursorState, QueryStatus +from firebolt.common.settings import Settings from firebolt.utils.exception import ( AsyncExecutionUnavailableError, CursorClosedError, DataError, EngineNotRunningError, + FireboltDatabaseError, OperationalError, + ProgrammingError, QueryNotRunError, ) from tests.unit.db_conftest import encode_param @@ -189,6 +192,10 @@ async def test_cursor_execute( async def test_cursor_execute_error( httpx_mock: HTTPXMock, query_url: str, + get_engines_url: str, + settings: Settings, + db_name: str, + get_databases_url: str, cursor: Cursor, get_engine_url_not_running_callback: Callable, system_engine_query_url: str, @@ -240,6 +247,37 @@ def http_error(*args, **kwargs): str(excinfo.value) == "Error executing query:\nQuery error message" ), f"Invalid authentication error message for {message}." + # Database does not exist error + httpx_mock.add_response( + status_code=codes.FORBIDDEN, + content="Query error message", + url=query_url, + ) + httpx_mock.add_response( + json={"edges": []}, + url=get_databases_url + "?filter.name_contains=database", + ) + with raises(FireboltDatabaseError) as excinfo: + await query() + assert cursor._state == CursorState.ERROR + assert db_name in str(excinfo) + + # Database exists but some other error + error_message = "My query error message" + httpx_mock.add_response( + status_code=codes.FORBIDDEN, + content=error_message, + url=query_url, + ) + httpx_mock.add_response( + json={"edges": ["my_db"]}, + url=get_databases_url + "?filter.name_contains=database", + ) + with raises(ProgrammingError) as excinfo: + await query() + assert cursor._state == CursorState.ERROR + assert error_message in str(excinfo) + # Engine is not running error httpx_mock.add_response( status_code=codes.SERVICE_UNAVAILABLE, @@ -253,6 +291,7 @@ def http_error(*args, **kwargs): with raises(EngineNotRunningError) as excinfo: await query() assert cursor._state == CursorState.ERROR + assert settings.server in str(excinfo) httpx_mock.reset(True) @@ -664,7 +703,11 @@ async def test_cursor_server_side_async_cancel( httpx_mock.add_callback( server_side_async_cancel_callback, url=query_with_params_url ) + cursor._set_parameters = {"invalid_parameter": "should_not_be_present"} await cursor.cancel(server_side_async_id) + cursor.close() + with raises(CursorClosedError): + await cursor.cancel(server_side_async_id) async def test_cursor_server_side_async_get_status_completed( @@ -727,3 +770,34 @@ async def test_cursor_server_side_async_get_status_error( str(excinfo.value) == f"Asynchronous query {server_side_async_id} status check failed." ), f"Invalid get_status error message." + + +async def test_cursor_iterate( + httpx_mock: HTTPXMock, + auth_callback: Callable, + auth_url: str, + query_callback: Callable, + query_url: str, + cursor: Cursor, + python_query_data: List[List[ColType]], +): + """Cursor is able to execute query, all fields are populated properly.""" + + httpx_mock.add_callback(auth_callback, url=auth_url) + httpx_mock.add_callback(query_callback, url=query_url) + + with raises(QueryNotRunError): + async for res in cursor: + pass + + await cursor.execute("select * from t") + i = 0 + async for res in cursor: + assert res in python_query_data + i += 1 + assert i == len(python_query_data), "Wrong number iterations of a cursor were done" + + cursor.close() + with raises(CursorClosedError): + async for res in cursor: + pass diff --git a/tests/unit/db/test_connection.py b/tests/unit/db/test_connection.py index 89ef9e741cb..99d9cab11f0 100644 --- a/tests/unit/db/test_connection.py +++ b/tests/unit/db/test_connection.py @@ -20,6 +20,14 @@ from firebolt.utils.token_storage import TokenSecureStorage +def test_connection_attributes(connection: Connection) -> None: + with raises(AttributeError): + connection.not_a_cursor() + + with raises(AttributeError): + connection.not_a_database + + def test_closed_connection(connection: Connection) -> None: """Connection methods are unavailable for closed connection.""" connection.close() @@ -36,13 +44,14 @@ def test_closed_connection(connection: Connection) -> None: def test_cursors_closed_on_close(connection: Connection) -> None: """Connection closes all its cursors on close.""" + assert connection.closed == False, "Initial state of connection is incorrect" c1, c2 = connection.cursor(), connection.cursor() assert ( len(connection._cursors) == 2 ), "Invalid number of cursors stored in connection" connection.close() - assert connection.closed, "Connection was not closed on close" + assert connection.closed == True, "Connection was not closed on close" assert c1.closed, "Cursor was not closed on connection close" assert c2.closed, "Cursor was not closed on connection close" assert len(connection._cursors) == 0, "Cursors left in connection after close" @@ -177,8 +186,8 @@ def test_connect_database( def test_connection_unclosed_warnings(auth: Auth): c = Connection("", "", auth, "", None) with warns(UserWarning) as winfo: - del c - gc.collect() + # Can't guarantee `del c` triggers garbage collection + c.__del__() assert any( "Unclosed" in str(warning.message) for warning in winfo.list @@ -219,6 +228,22 @@ def test_connection_token_caching( mock_connection_flow() mock_query() + # Using caching by default + with Patcher(): + with connect( + database=db_name, + username=settings.user, + password=settings.password.get_secret_value(), + engine_url=settings.server, + account_name=settings.account_name, + api_endpoint=settings.server, + ) as connection: + assert connection.cursor().execute("select*") == len(python_query_data) + ts = TokenSecureStorage( + username=settings.user, password=settings.password.get_secret_value() + ) + assert ts.get_cached_token() == access_token, "Invalid token value cached" + with Patcher(): with connect( database=db_name, diff --git a/tests/unit/db/test_cursor.py b/tests/unit/db/test_cursor.py index 04f9082012b..eb175814180 100644 --- a/tests/unit/db/test_cursor.py +++ b/tests/unit/db/test_cursor.py @@ -5,12 +5,16 @@ from pytest import raises from pytest_httpx import HTTPXMock +from firebolt.common.settings import Settings from firebolt.db import Cursor from firebolt.db.cursor import ColType, Column, CursorState, QueryStatus from firebolt.utils.exception import ( CursorClosedError, DataError, + EngineNotRunningError, + FireboltDatabaseError, OperationalError, + ProgrammingError, QueryNotRunError, ) from tests.unit.db_conftest import encode_param @@ -182,7 +186,13 @@ def test_cursor_execute( def test_cursor_execute_error( httpx_mock: HTTPXMock, + auth_callback: Callable, + auth_url: str, + get_engines_url: str, + settings: Settings, + db_name: str, query_url: str, + get_databases_url: str, cursor: Cursor, ): """Cursor handles all types of errors properly.""" @@ -231,6 +241,56 @@ def http_error(*args, **kwargs): assert ( str(excinfo.value) == "Error executing query:\nQuery error message" ), f"Invalid authentication error message for {message}." + + # Database does not exist error + httpx_mock.add_response( + status_code=codes.FORBIDDEN, + content="Query error message", + url=query_url, + ) + httpx_mock.add_response( + json={"edges": []}, + url=get_databases_url + "?filter.name_contains=database", + ) + with raises(FireboltDatabaseError) as excinfo: + query() + assert cursor._state == CursorState.ERROR + assert db_name in str(excinfo) + + # Database exists but some other error + error_message = "My query error message" + httpx_mock.add_response( + status_code=codes.FORBIDDEN, + content=error_message, + url=query_url, + ) + httpx_mock.add_response( + json={"edges": ["my_db"]}, + url=get_databases_url + "?filter.name_contains=database", + ) + with raises(ProgrammingError) as excinfo: + query() + assert cursor._state == CursorState.ERROR + assert error_message in str(excinfo) + + # Engine is not running error + httpx_mock.add_response( + status_code=codes.SERVICE_UNAVAILABLE, + content="Query error message", + url=query_url, + ) + httpx_mock.add_response( + json={"edges": []}, + url=( + get_engines_url + "?filter.name_contains=api_dev" + "&filter.current_status_eq=ENGINE_STATUS_RUNNING_REVISION_SERVING" + ), + ) + with raises(EngineNotRunningError) as excinfo: + query() + assert cursor._state == CursorState.ERROR + assert settings.server in str(excinfo) + httpx_mock.reset(True) @@ -551,8 +611,13 @@ def test_cursor_server_side_async_cancel( httpx_mock.add_callback( server_side_async_cancel_callback, url=query_with_params_url ) + cursor._set_parameters = {"invalid_parameter": "should_not_be_present"} cursor.cancel(server_side_async_id) + cursor.close() + with raises(CursorClosedError): + cursor.cancel(server_side_async_id) + def test_cursor_server_side_async_get_status_completed( httpx_mock: HTTPXMock, @@ -614,3 +679,34 @@ def test_cursor_server_side_async_get_status_error( str(excinfo.value) == f"Asynchronous query {server_side_async_id} status check failed." ), f"Invalid get_status error message." + + +def test_cursor_iterate( + httpx_mock: HTTPXMock, + auth_callback: Callable, + auth_url: str, + query_callback: Callable, + query_url: str, + cursor: Cursor, + python_query_data: List[List[ColType]], +): + """Cursor is able to execute query, all fields are populated properly.""" + + httpx_mock.add_callback(auth_callback, url=auth_url) + httpx_mock.add_callback(query_callback, url=query_url) + + with raises(QueryNotRunError): + for res in cursor: + pass + + cursor.execute("select * from t") + i = 0 + for res in cursor: + assert res == python_query_data[i] + i += 1 + assert i == len(python_query_data), "Wrong number iterations of a cursor were done" + + cursor.close() + with raises(CursorClosedError): + for res in cursor: + pass diff --git a/tests/unit/db/test_util.py b/tests/unit/db/test_util.py new file mode 100644 index 00000000000..9b6cc900b1c --- /dev/null +++ b/tests/unit/db/test_util.py @@ -0,0 +1,72 @@ +from typing import Callable + +from pytest_httpx import HTTPXMock + +from firebolt.common.settings import Settings +from firebolt.db.connection import Connection +from firebolt.db.util import is_db_available, is_engine_running +from firebolt.utils.urls import DATABASES_URL, ENGINES_URL + + +def test_is_db_available( + connection: Connection, + httpx_mock: HTTPXMock, + settings: Settings, + auth_url: str, + auth_callback: Callable, +): + httpx_mock.add_callback(auth_callback, url=auth_url) + httpx_mock.add_response( + url=f"https://{settings.server}{DATABASES_URL}?filter.name_contains=dummy", + method="GET", + json={"edges": ["one"]}, + ) + assert is_db_available(connection, "dummy") == True + + +def test_is_db_not_available( + connection: Connection, + httpx_mock: HTTPXMock, + settings: Settings, + auth_url: str, + auth_callback: Callable, +): + httpx_mock.add_callback(auth_callback, url=auth_url) + httpx_mock.add_response( + url=f"https://{settings.server}{DATABASES_URL}?filter.name_contains=dummy", + method="GET", + json={"edges": []}, + ) + assert is_db_available(connection, "dummy") == False + + +def test_is_engine_running( + connection: Connection, + httpx_mock: HTTPXMock, + settings: Settings, + auth_url: str, + auth_callback: Callable, +): + httpx_mock.add_callback(auth_callback, url=auth_url) + httpx_mock.add_response( + url=f"https://{settings.server}{ENGINES_URL}?filter.name_contains=my_engine&filter.current_status_eq=ENGINE_STATUS_RUNNING_REVISION_SERVING", + method="GET", + json={"edges": ["one"]}, + ) + assert is_engine_running(connection, "https://my-engine.dev.firebolt.io") == True + + +def test_is_engine_not_running( + connection: Connection, + httpx_mock: HTTPXMock, + settings: Settings, + auth_url: str, + auth_callback: Callable, +): + httpx_mock.add_callback(auth_callback, url=auth_url) + httpx_mock.add_response( + url=f"https://{settings.server}{ENGINES_URL}?filter.name_contains=my_engine&filter.current_status_eq=ENGINE_STATUS_RUNNING_REVISION_SERVING", + method="GET", + json={"edges": []}, + ) + assert is_engine_running(connection, "https://my-engine.dev.firebolt.io") == False diff --git a/tests/unit/db_conftest.py b/tests/unit/db_conftest.py index c5a740ac0d9..541e4dd35d9 100644 --- a/tests/unit/db_conftest.py +++ b/tests/unit/db_conftest.py @@ -139,6 +139,14 @@ def server_side_async_id() -> str: @fixture def server_side_async_cancel_callback(server_side_async_id) -> Response: def do_query(request: Request, **kwargs) -> Response: + # Make sure no set parameters are added + assert list(request.url.params.keys()) == [ + "database", + "query_id", + ], "invalid query params for async cancel" + assert request.url.path == "/cancel" + # Cancel has no body + assert request.read() == b"" query_response = { "meta": [ {"name": "host", "type": "String"}, @@ -236,6 +244,9 @@ def query_callback( query_description: List[Column], query_data: List[List[ColType]] ) -> Callable: def do_query(request: Request, **kwargs) -> Response: + assert request.read() != b"" + assert request.method == "POST" + assert f"output_format={JSON_OUTPUT_FORMAT}" in str(request.url) query_response = { "meta": [{"name": c.name, "type": c.type_code} for c in query_description], "data": query_data, From 09e2e7013a9e499efec607c92e723f634b476df3 Mon Sep 17 00:00:00 2001 From: ptiurin Date: Wed, 31 May 2023 09:45:42 +0100 Subject: [PATCH 2/5] fix tests after merge --- tests/unit/async_db/test_connection.py | 6 ++---- tests/unit/db/test_connection.py | 6 ++---- 2 files changed, 4 insertions(+), 8 deletions(-) diff --git a/tests/unit/async_db/test_connection.py b/tests/unit/async_db/test_connection.py index 9b741741227..c2be372d5a2 100644 --- a/tests/unit/async_db/test_connection.py +++ b/tests/unit/async_db/test_connection.py @@ -211,7 +211,7 @@ async def test_connection_token_caching( async with await connect( database=db_name, username=settings.user, - password=settings.password.get_secret_value(), + password=settings.password, engine_url=settings.server, account_name=settings.account_name, api_endpoint=settings.server, @@ -219,9 +219,7 @@ async def test_connection_token_caching( assert await connection.cursor().execute("select*") == len( python_query_data ) - ts = TokenSecureStorage( - username=settings.user, password=settings.password.get_secret_value() - ) + ts = TokenSecureStorage(username=settings.user, password=settings.password) assert ts.get_cached_token() == access_token, "Invalid token value cached" with Patcher(): diff --git a/tests/unit/db/test_connection.py b/tests/unit/db/test_connection.py index 99d9cab11f0..dbce0bf0657 100644 --- a/tests/unit/db/test_connection.py +++ b/tests/unit/db/test_connection.py @@ -233,15 +233,13 @@ def test_connection_token_caching( with connect( database=db_name, username=settings.user, - password=settings.password.get_secret_value(), + password=settings.password, engine_url=settings.server, account_name=settings.account_name, api_endpoint=settings.server, ) as connection: assert connection.cursor().execute("select*") == len(python_query_data) - ts = TokenSecureStorage( - username=settings.user, password=settings.password.get_secret_value() - ) + ts = TokenSecureStorage(username=settings.user, password=settings.password) assert ts.get_cached_token() == access_token, "Invalid token value cached" with Patcher(): From 61bd13b8f7401cea9fc134dab9555fe8ecfc8618 Mon Sep 17 00:00:00 2001 From: ptiurin Date: Mon, 19 Jun 2023 14:31:12 +0100 Subject: [PATCH 3/5] fixing tests after rebase --- tests/unit/async_db/test_connection.py | 11 ++- tests/unit/async_db/test_cursor.py | 73 +++++++++++---- tests/unit/db/conftest.py | 18 ++++ tests/unit/db/test_connection.py | 11 ++- tests/unit/db/test_cursor.py | 69 ++++++++++---- tests/unit/db/test_util.py | 93 ++++++++++++------- tests/unit/db_conftest.py | 120 +++++++++++++------------ 7 files changed, 260 insertions(+), 135 deletions(-) diff --git a/tests/unit/async_db/test_connection.py b/tests/unit/async_db/test_connection.py index c2be372d5a2..bd3d9701879 100644 --- a/tests/unit/async_db/test_connection.py +++ b/tests/unit/async_db/test_connection.py @@ -210,16 +210,15 @@ async def test_connection_token_caching( with Patcher(): async with await connect( database=db_name, - username=settings.user, - password=settings.password, - engine_url=settings.server, - account_name=settings.account_name, - api_endpoint=settings.server, + auth=ClientCredentials(client_id, client_secret, use_token_cache=True), + engine_name=engine_name, + account_name=account_name, + api_endpoint=server, ) as connection: assert await connection.cursor().execute("select*") == len( python_query_data ) - ts = TokenSecureStorage(username=settings.user, password=settings.password) + ts = TokenSecureStorage(username=client_id, password=client_secret) assert ts.get_cached_token() == access_token, "Invalid token value cached" with Patcher(): diff --git a/tests/unit/async_db/test_cursor.py b/tests/unit/async_db/test_cursor.py index fbd99f711a3..95cdb0b6b1c 100644 --- a/tests/unit/async_db/test_cursor.py +++ b/tests/unit/async_db/test_cursor.py @@ -1,4 +1,4 @@ -from typing import Callable, Dict, List +from typing import Any, Callable, Dict, List from unittest.mock import patch from httpx import HTTPStatusError, StreamError, codes @@ -15,6 +15,7 @@ DataError, EngineNotRunningError, FireboltDatabaseError, + FireboltEngineError, OperationalError, ProgrammingError, QueryNotRunError, @@ -195,9 +196,8 @@ async def test_cursor_execute_error( get_engines_url: str, settings: Settings, db_name: str, - get_databases_url: str, + query_statistics: Dict[str, Any], cursor: Cursor, - get_engine_url_not_running_callback: Callable, system_engine_query_url: str, ): """Cursor handles all types of errors properly.""" @@ -252,10 +252,17 @@ def http_error(*args, **kwargs): status_code=codes.FORBIDDEN, content="Query error message", url=query_url, + match_content=b"select * from t", ) httpx_mock.add_response( - json={"edges": []}, - url=get_databases_url + "?filter.name_contains=database", + url=system_engine_query_url, + method="POST", + json={ + "rows": "0", + "data": [], + "meta": [], + "statistics": query_statistics, + }, ) with raises(FireboltDatabaseError) as excinfo: await query() @@ -268,10 +275,17 @@ def http_error(*args, **kwargs): status_code=codes.FORBIDDEN, content=error_message, url=query_url, + match_content=b"select * from t", ) httpx_mock.add_response( - json={"edges": ["my_db"]}, - url=get_databases_url + "?filter.name_contains=database", + url=system_engine_query_url, + method="POST", + json={ + "rows": "1", + "data": ["my_db"], + "meta": [], + "statistics": query_statistics, + }, ) with raises(ProgrammingError) as excinfo: await query() @@ -284,15 +298,45 @@ def http_error(*args, **kwargs): content="Query error message", url=query_url, ) - httpx_mock.add_callback( - get_engine_url_not_running_callback, + httpx_mock.add_response( url=system_engine_query_url, + method="POST", + json={ + "rows": "1", + "data": [[get_engines_url, "my_db", "Stopped"]], + "meta": [ + {"name": "url", "type": "text"}, + {"name": "attached_to", "type": "text"}, + {"name": "status", "type": "text"}, + ], + "statistics": query_statistics, + }, ) with raises(EngineNotRunningError) as excinfo: await query() assert cursor._state == CursorState.ERROR assert settings.server in str(excinfo) + # Engine does not exist + httpx_mock.add_response( + status_code=codes.SERVICE_UNAVAILABLE, + content="Query error message", + url=query_url, + ) + httpx_mock.add_response( + url=system_engine_query_url, + method="POST", + json={ + "rows": "0", + "data": [], + "meta": [], + "statistics": query_statistics, + }, + ) + with raises(FireboltEngineError) as excinfo: + await query() + assert cursor._state == CursorState.ERROR + httpx_mock.reset(True) @@ -526,11 +570,11 @@ async def test_cursor_multi_statement( for i, (desc, exp) in enumerate(zip(cursor.description, python_query_description)): assert desc == exp, f"Invalid column description at position {i}" - assert cursor.statistics.elapsed == 0.002983335 - assert cursor.statistics.time_before_execution == 0.002729331 - assert cursor.statistics.time_to_execute == 0.000215215 + assert cursor.statistics.elapsed == 0.116907717 + assert cursor.statistics.time_before_execution == 0.012180623 + assert cursor.statistics.time_to_execute == 0.104614307 assert cursor.statistics.rows_read == 1 - assert cursor.statistics.bytes_read == 1 + assert cursor.statistics.bytes_read == 61 assert cursor.statistics.scanned_bytes_cache == 0 assert cursor.statistics.scanned_bytes_storage == 0 @@ -774,8 +818,6 @@ async def test_cursor_server_side_async_get_status_error( async def test_cursor_iterate( httpx_mock: HTTPXMock, - auth_callback: Callable, - auth_url: str, query_callback: Callable, query_url: str, cursor: Cursor, @@ -783,7 +825,6 @@ async def test_cursor_iterate( ): """Cursor is able to execute query, all fields are populated properly.""" - httpx_mock.add_callback(auth_callback, url=auth_url) httpx_mock.add_callback(query_callback, url=query_url) with raises(QueryNotRunError): diff --git a/tests/unit/db/conftest.py b/tests/unit/db/conftest.py index 85b3a1d6082..ec8d299f9f8 100644 --- a/tests/unit/db/conftest.py +++ b/tests/unit/db/conftest.py @@ -26,6 +26,24 @@ def connection( yield connection +@fixture +def system_connection( + server: str, + db_name: str, + auth: Auth, + account_name: str, + mock_system_connection_flow: Callable, +) -> Connection: + mock_system_connection_flow() + with connect( + database=db_name, + auth=auth, + account_name=account_name, + api_endpoint=server, + ) as connection: + yield connection + + @fixture def cursor(connection: Connection) -> Cursor: return connection.cursor() diff --git a/tests/unit/db/test_connection.py b/tests/unit/db/test_connection.py index dbce0bf0657..1399922558f 100644 --- a/tests/unit/db/test_connection.py +++ b/tests/unit/db/test_connection.py @@ -232,14 +232,13 @@ def test_connection_token_caching( with Patcher(): with connect( database=db_name, - username=settings.user, - password=settings.password, - engine_url=settings.server, - account_name=settings.account_name, - api_endpoint=settings.server, + auth=ClientCredentials(client_id, client_secret), + engine_name=engine_name, + account_name=account_name, + api_endpoint=server, ) as connection: assert connection.cursor().execute("select*") == len(python_query_data) - ts = TokenSecureStorage(username=settings.user, password=settings.password) + ts = TokenSecureStorage(username=client_id, password=client_secret) assert ts.get_cached_token() == access_token, "Invalid token value cached" with Patcher(): diff --git a/tests/unit/db/test_cursor.py b/tests/unit/db/test_cursor.py index eb175814180..d61bc23a531 100644 --- a/tests/unit/db/test_cursor.py +++ b/tests/unit/db/test_cursor.py @@ -1,4 +1,4 @@ -from typing import Callable, Dict, List +from typing import Any, Callable, Dict, List from unittest.mock import patch from httpx import HTTPStatusError, StreamError, codes @@ -13,6 +13,7 @@ DataError, EngineNotRunningError, FireboltDatabaseError, + FireboltEngineError, OperationalError, ProgrammingError, QueryNotRunError, @@ -186,14 +187,13 @@ def test_cursor_execute( def test_cursor_execute_error( httpx_mock: HTTPXMock, - auth_callback: Callable, - auth_url: str, get_engines_url: str, settings: Settings, db_name: str, query_url: str, - get_databases_url: str, + query_statistics: Dict[str, Any], cursor: Cursor, + system_engine_query_url: str, ): """Cursor handles all types of errors properly.""" for query, message in ( @@ -249,8 +249,14 @@ def http_error(*args, **kwargs): url=query_url, ) httpx_mock.add_response( - json={"edges": []}, - url=get_databases_url + "?filter.name_contains=database", + url=system_engine_query_url, + method="POST", + json={ + "rows": "0", + "data": [], + "meta": [], + "statistics": query_statistics, + }, ) with raises(FireboltDatabaseError) as excinfo: query() @@ -263,10 +269,17 @@ def http_error(*args, **kwargs): status_code=codes.FORBIDDEN, content=error_message, url=query_url, + match_content=b"select * from t", ) httpx_mock.add_response( - json={"edges": ["my_db"]}, - url=get_databases_url + "?filter.name_contains=database", + url=system_engine_query_url, + method="POST", + json={ + "rows": "1", + "data": ["my_db"], + "meta": [], + "statistics": query_statistics, + }, ) with raises(ProgrammingError) as excinfo: query() @@ -280,17 +293,44 @@ def http_error(*args, **kwargs): url=query_url, ) httpx_mock.add_response( - json={"edges": []}, - url=( - get_engines_url + "?filter.name_contains=api_dev" - "&filter.current_status_eq=ENGINE_STATUS_RUNNING_REVISION_SERVING" - ), + url=system_engine_query_url, + method="POST", + json={ + "rows": "1", + "data": [[get_engines_url, "my_db", "Stopped"]], + "meta": [ + {"name": "url", "type": "text"}, + {"name": "attached_to", "type": "text"}, + {"name": "status", "type": "text"}, + ], + "statistics": query_statistics, + }, ) with raises(EngineNotRunningError) as excinfo: query() assert cursor._state == CursorState.ERROR assert settings.server in str(excinfo) + # Engine does not exist + httpx_mock.add_response( + status_code=codes.SERVICE_UNAVAILABLE, + content="Query error message", + url=query_url, + ) + httpx_mock.add_response( + url=system_engine_query_url, + method="POST", + json={ + "rows": "0", + "data": [], + "meta": [], + "statistics": query_statistics, + }, + ) + with raises(FireboltEngineError) as excinfo: + query() + assert cursor._state == CursorState.ERROR + httpx_mock.reset(True) @@ -683,8 +723,6 @@ def test_cursor_server_side_async_get_status_error( def test_cursor_iterate( httpx_mock: HTTPXMock, - auth_callback: Callable, - auth_url: str, query_callback: Callable, query_url: str, cursor: Cursor, @@ -692,7 +730,6 @@ def test_cursor_iterate( ): """Cursor is able to execute query, all fields are populated properly.""" - httpx_mock.add_callback(auth_callback, url=auth_url) httpx_mock.add_callback(query_callback, url=query_url) with raises(QueryNotRunError): diff --git a/tests/unit/db/test_util.py b/tests/unit/db/test_util.py index 9b6cc900b1c..a21ac69b73a 100644 --- a/tests/unit/db/test_util.py +++ b/tests/unit/db/test_util.py @@ -1,25 +1,26 @@ -from typing import Callable +from typing import Any, Dict from pytest_httpx import HTTPXMock -from firebolt.common.settings import Settings from firebolt.db.connection import Connection from firebolt.db.util import is_db_available, is_engine_running -from firebolt.utils.urls import DATABASES_URL, ENGINES_URL def test_is_db_available( connection: Connection, httpx_mock: HTTPXMock, - settings: Settings, - auth_url: str, - auth_callback: Callable, + query_statistics: Dict[str, Any], + system_engine_query_url: str, ): - httpx_mock.add_callback(auth_callback, url=auth_url) httpx_mock.add_response( - url=f"https://{settings.server}{DATABASES_URL}?filter.name_contains=dummy", - method="GET", - json={"edges": ["one"]}, + url=system_engine_query_url, + method="POST", + json={ + "rows": "1", + "data": ["my_db"], + "meta": [], + "statistics": query_statistics, + }, ) assert is_db_available(connection, "dummy") == True @@ -27,46 +28,72 @@ def test_is_db_available( def test_is_db_not_available( connection: Connection, httpx_mock: HTTPXMock, - settings: Settings, - auth_url: str, - auth_callback: Callable, + system_engine_query_url: str, + query_statistics: Dict[str, Any], ): - httpx_mock.add_callback(auth_callback, url=auth_url) httpx_mock.add_response( - url=f"https://{settings.server}{DATABASES_URL}?filter.name_contains=dummy", - method="GET", - json={"edges": []}, + url=system_engine_query_url, + method="POST", + json={ + "rows": "0", + "data": [], + "meta": [], + "statistics": query_statistics, + }, ) assert is_db_available(connection, "dummy") == False +def test_is_engine_running_system( + system_connection: Connection, +): + # System engine is always running + assert is_engine_running(system_connection, "dummy") == True + + def test_is_engine_running( connection: Connection, httpx_mock: HTTPXMock, - settings: Settings, - auth_url: str, - auth_callback: Callable, + system_engine_query_url: str, + query_statistics: Dict[str, Any], + get_engines_url: str, ): - httpx_mock.add_callback(auth_callback, url=auth_url) httpx_mock.add_response( - url=f"https://{settings.server}{ENGINES_URL}?filter.name_contains=my_engine&filter.current_status_eq=ENGINE_STATUS_RUNNING_REVISION_SERVING", - method="GET", - json={"edges": ["one"]}, + url=system_engine_query_url, + method="POST", + json={ + "rows": "1", + "data": [[get_engines_url, "my_db", "Running"]], + "meta": [ + {"name": "url", "type": "text"}, + {"name": "attached_to", "type": "text"}, + {"name": "status", "type": "text"}, + ], + "statistics": query_statistics, + }, ) - assert is_engine_running(connection, "https://my-engine.dev.firebolt.io") == True + assert is_engine_running(connection, get_engines_url) == True def test_is_engine_not_running( connection: Connection, httpx_mock: HTTPXMock, - settings: Settings, - auth_url: str, - auth_callback: Callable, + system_engine_query_url: str, + query_statistics: Dict[str, Any], + get_engines_url: str, ): - httpx_mock.add_callback(auth_callback, url=auth_url) httpx_mock.add_response( - url=f"https://{settings.server}{ENGINES_URL}?filter.name_contains=my_engine&filter.current_status_eq=ENGINE_STATUS_RUNNING_REVISION_SERVING", - method="GET", - json={"edges": []}, + url=system_engine_query_url, + method="POST", + json={ + "rows": "1", + "data": [[get_engines_url, "my_db", "Stopped"]], + "meta": [ + {"name": "url", "type": "text"}, + {"name": "attached_to", "type": "text"}, + {"name": "status", "type": "text"}, + ], + "statistics": query_statistics, + }, ) - assert is_engine_running(connection, "https://my-engine.dev.firebolt.io") == False + assert is_engine_running(connection, get_engines_url) == False diff --git a/tests/unit/db_conftest.py b/tests/unit/db_conftest.py index 541e4dd35d9..5ed5a951758 100644 --- a/tests/unit/db_conftest.py +++ b/tests/unit/db_conftest.py @@ -137,10 +137,26 @@ def server_side_async_id() -> str: @fixture -def server_side_async_cancel_callback(server_side_async_id) -> Response: +def query_statistics() -> Dict[str, Any]: + # Just some dummy statistics to have in query response + return { + "elapsed": 0.116907717, + "rows_read": 1, + "bytes_read": 61, + "time_before_execution": 0.012180623, + "time_to_execute": 0.104614307, + "scanned_bytes_cache": 0, + "scanned_bytes_storage": 0, + } + + +@fixture +def server_side_async_cancel_callback( + server_side_async_id, query_statistics: Dict[str, Any] +) -> Response: def do_query(request: Request, **kwargs) -> Response: # Make sure no set parameters are added - assert list(request.url.params.keys()) == [ + assert sorted(list(request.url.params.keys())) == [ "database", "query_id", ], "invalid query params for async cancel" @@ -158,15 +174,7 @@ def do_query(request: Request, **kwargs) -> Response: ], "data": [["node1.node.consul", 9000, 0, "", 0, 0]], "rows": 1, - "statistics": { - "elapsed": 0.116907717, - "rows_read": 1, - "bytes_read": 61, - "time_before_execution": 0.012180623, - "time_to_execute": 0.104614307, - "scanned_bytes_cache": 0, - "scanned_bytes_storage": 0, - }, + "statistics": query_statistics, } return Response(status_code=codes.OK, json=query_response) @@ -241,7 +249,9 @@ def do_query(request: Request, **kwargs) -> Response: @fixture def query_callback( - query_description: List[Column], query_data: List[List[ColType]] + query_description: List[Column], + query_data: List[List[ColType]], + query_statistics: Dict[str, Any], ) -> Callable: def do_query(request: Request, **kwargs) -> Response: assert request.read() != b"" @@ -251,15 +261,7 @@ def do_query(request: Request, **kwargs) -> Response: "meta": [{"name": c.name, "type": c.type_code} for c in query_description], "data": query_data, "rows": len(query_data), - "statistics": { - "elapsed": 0.002983335, - "time_before_execution": 0.002729331, - "time_to_execute": 0.000215215, - "rows_read": 1, - "bytes_read": 1, - "scanned_bytes_cache": 0, - "scanned_bytes_storage": 0, - }, + "statistics": query_statistics, } return Response(status_code=codes.OK, json=query_response) @@ -296,6 +298,7 @@ def query_with_params_callback( query_description: List[Column], query_data: List[List[ColType]], set_params: Dict, + query_statistics: Dict[str, Any], ) -> Callable: def do_query(request: Request, **kwargs) -> Response: set_parameters = request.url.params @@ -307,16 +310,7 @@ def do_query(request: Request, **kwargs) -> Response: "meta": [{"name": c.name, "type": c.type_code} for c in query_description], "data": query_data, "rows": len(query_data), - # Real example of statistics field value, not used by our code - "statistics": { - "elapsed": 0.002983335, - "time_before_execution": 0.002729331, - "time_to_execute": 0.000215215, - "rows_read": 1, - "bytes_read": 1, - "scanned_bytes_cache": 0, - "scanned_bytes_storage": 0, - }, + "statistics": query_statistics, } return Response(status_code=codes.OK, json=query_response) @@ -361,7 +355,9 @@ def query_with_params_url(query_url: str, set_params: str) -> str: query_url = f"{query_url}&{params_encoded}" -def _get_engine_url_callback(server: str, db_name: str, status="Running") -> Callable: +def _get_engine_url_callback( + server: str, db_name: str, query_statistics: Dict[str, Any], status="Running" +) -> Callable: def do_query(request: Request, **kwargs) -> Response: set_parameters = request.url.params assert ( @@ -375,16 +371,7 @@ def do_query(request: Request, **kwargs) -> Response: "meta": [{"name": "name", "type": "Text"} for _ in range(len(data[0]))], "data": data, "rows": len(data), - # Real example of statistics field value, not used by our code - "statistics": { - "elapsed": 0.002983335, - "time_before_execution": 0.002729331, - "time_to_execute": 0.000215215, - "rows_read": 1, - "bytes_read": 1, - "scanned_bytes_cache": 0, - "scanned_bytes_storage": 0, - }, + "statistics": query_statistics, } return Response(status_code=codes.OK, json=query_response) @@ -392,21 +379,31 @@ def do_query(request: Request, **kwargs) -> Response: @fixture -def get_engine_url_callback(server: str, db_name: str, status="Running") -> Callable: - return _get_engine_url_callback(server, db_name) +def get_engine_url_callback( + server: str, db_name: str, query_statistics: Dict[str, Any], status="Running" +) -> Callable: + return _get_engine_url_callback(server, db_name, query_statistics) @fixture -def get_engine_url_not_running_callback(engine_name, db_name) -> Callable: - return _get_engine_url_callback(engine_name, db_name, "Stopped") +def get_engine_url_not_running_callback( + engine_name, db_name, query_statistics: Dict[str, Any] +) -> Callable: + return _get_engine_url_callback(engine_name, db_name, query_statistics, "Stopped") @fixture -def get_engine_url_invalid_db_callback(engine_name, db_name) -> Callable: - return _get_engine_url_callback(engine_name, "not_" + db_name) +def get_engine_url_invalid_db_callback( + engine_name, + db_name, + query_statistics: Dict[str, Any], +) -> Callable: + return _get_engine_url_callback(engine_name, "not_" + db_name, query_statistics) -def _get_default_db_engine_callback(server: str, status="Running") -> Callable: +def _get_default_db_engine_callback( + server: str, query_statistics: Dict[str, Any], status="Running" +) -> Callable: def do_query(request: Request, **kwargs) -> Response: set_parameters = request.url.params assert len(set_parameters) == 1 and "output_format" in set_parameters @@ -416,15 +413,7 @@ def do_query(request: Request, **kwargs) -> Response: "data": data, "rows": len(data), # Real example of statistics field value, not used by our code - "statistics": { - "elapsed": 0.002983335, - "time_before_execution": 0.002729331, - "time_to_execute": 0.000215215, - "rows_read": 1, - "bytes_read": 1, - "scanned_bytes_cache": 0, - "scanned_bytes_storage": 0, - }, + "statistics": query_statistics, } return Response(status_code=codes.OK, json=query_response) @@ -504,6 +493,21 @@ def inner() -> None: return inner +@fixture +def mock_system_connection_flow( + httpx_mock: HTTPXMock, + auth_url: str, + check_credentials_callback: Callable, + get_system_engine_url: str, + get_system_engine_callback: Callable, +) -> Callable: + def inner() -> None: + httpx_mock.add_callback(check_credentials_callback, url=auth_url) + httpx_mock.add_callback(get_system_engine_callback, url=get_system_engine_url) + + return inner + + @fixture def mock_query( httpx_mock: HTTPXMock, From 2ac33770f07396ec811e05b7d893cf8f17ea9fc6 Mon Sep 17 00:00:00 2001 From: ptiurin Date: Mon, 19 Jun 2023 14:34:45 +0100 Subject: [PATCH 4/5] fix precommit --- src/firebolt/db/cursor.py | 1 - 1 file changed, 1 deletion(-) diff --git a/src/firebolt/db/cursor.py b/src/firebolt/db/cursor.py index 1ffdd7f91c8..87e3ff960d8 100644 --- a/src/firebolt/db/cursor.py +++ b/src/firebolt/db/cursor.py @@ -3,7 +3,6 @@ import logging import re import time -from threading import Lock from typing import ( TYPE_CHECKING, Any, From 1d1b3d110fefb65e97b749e61c0a3068b686de31 Mon Sep 17 00:00:00 2001 From: Petro Tiurin <93913847+ptiurin@users.noreply.github.com> Date: Tue, 20 Jun 2023 18:29:54 +0100 Subject: [PATCH 5/5] Update test_connection.py --- tests/unit/async_db/test_connection.py | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/tests/unit/async_db/test_connection.py b/tests/unit/async_db/test_connection.py index bd3d9701879..0f704597909 100644 --- a/tests/unit/async_db/test_connection.py +++ b/tests/unit/async_db/test_connection.py @@ -18,12 +18,11 @@ from firebolt.utils.token_storage import TokenSecureStorage +@mark.skip("__slots__ is broken on Connection class") async def test_connection_attributes(connection: Connection) -> None: + """Test that no unexpected values can be set. Governed by __slots__""" with raises(AttributeError): - await connection.not_a_cursor() - - with raises(AttributeError): - connection.not_a_database + connection.not_a_database = "dummy" async def test_closed_connection(connection: Connection) -> None: