diff --git a/docsrc/Connecting_and_queries.rst b/docsrc/Connecting_and_queries.rst index 3e7d843bb27..b4e55c2fa21 100644 --- a/docsrc/Connecting_and_queries.rst +++ b/docsrc/Connecting_and_queries.rst @@ -672,6 +672,24 @@ will send a cancel request to the server and the query will be stopped. print(successful) # False +Retrieving asynchronous query information +----------------------------------------- + +To get additional information about an async query, use the :py:meth:`firebolt.db.connection.Connection.get_async_query_info` method. +This method returns a list of ``AsyncQueryInfo`` objects, each containing detailed information about the query execution. + +:: + + token = cursor.async_query_token + query_info_list = connection.get_async_query_info(token) + + for query_info in query_info_list: + print(f"Query ID: {query_info.query_id}") + print(f"Status: {query_info.status}") + print(f"Submitted time: {query_info.submitted_time}") + print(f"Rows scanned: {query_info.scanned_rows}") + print(f"Error message: {query_info.error_message}") + Streaming query results ============================== diff --git a/src/firebolt/async_db/connection.py b/src/firebolt/async_db/connection.py index 05603bbe10d..62f6f215330 100644 --- a/src/firebolt/async_db/connection.py +++ b/src/firebolt/async_db/connection.py @@ -17,6 +17,7 @@ ASYNC_QUERY_STATUS_SUCCESSFUL, AsyncQueryInfo, BaseConnection, + _parse_async_query_info_results, ) from firebolt.common.cache import _firebolt_system_engine_cache from firebolt.common.constants import DEFAULT_TIMEOUT_SECONDS @@ -92,34 +93,41 @@ def cursor(self, **kwargs: Any) -> Cursor: return c # Server-side async methods - async def _get_async_query_info(self, token: str) -> AsyncQueryInfo: + async def get_async_query_info(self, token: str) -> List[AsyncQueryInfo]: + """ + Retrieve information about an asynchronous query using its token. + This method fetches the status and details of an asynchronous query + identified by the provided token. + Args: + token (str): The token identifying the asynchronous query. + Returns: + List[AsyncQueryInfo]: A list of AsyncQueryInfo objects containing + details about the asynchronous query. + """ + if self.cursor_type != CursorV2: raise FireboltError( "This method is only supported for connection with service account." ) cursor = self.cursor() await cursor.execute(ASYNC_QUERY_STATUS_REQUEST, [token]) - result = await cursor.fetchone() - if cursor.rowcount != 1 or not result: + results = await cursor.fetchall() + if not results: raise FireboltError("Unexpected result from async query status request.") columns = cursor.description - result_dict = dict(zip([column.name for column in columns], result)) + columns_names = [column.name for column in columns] + return _parse_async_query_info_results(results, columns_names) - if not result_dict.get("status") or not result_dict.get("query_id"): - raise FireboltError( - "Something went wrong - async query status request returned " - "unexpected result with status and/or query id missing. " - "Rerun the command and reach out to Firebolt support if " - "the issue persists." + def _raise_if_multiple_async_results( + self, async_query_info: List[AsyncQueryInfo] + ) -> None: + # We expect only one result in current implementation + if len(async_query_info) != 1: + raise NotImplementedError( + "Async query status request returned more than one result. " + "This is not supported yet." ) - # Only pass the expected keys to AsyncQueryInfo - filtered_result_dict = { - k: v for k, v in result_dict.items() if k in AsyncQueryInfo._fields - } - - return AsyncQueryInfo(**filtered_result_dict) - async def is_async_query_running(self, token: str) -> bool: """ Check if an async query is still running. @@ -130,8 +138,10 @@ async def is_async_query_running(self, token: str) -> bool: Returns: bool: True if async query is still running, False otherwise """ - async_query_details = await self._get_async_query_info(token) - return async_query_details.status == ASYNC_QUERY_STATUS_RUNNING + async_query_info = await self.get_async_query_info(token) + self._raise_if_multiple_async_results(async_query_info) + # We expect only one result + return async_query_info[0].status == ASYNC_QUERY_STATUS_RUNNING async def is_async_query_successful(self, token: str) -> Optional[bool]: """ @@ -144,10 +154,12 @@ async def is_async_query_successful(self, token: str) -> Optional[bool]: bool: None if the query is still running, True if successful, False otherwise """ - async_query_details = await self._get_async_query_info(token) - if async_query_details.status == ASYNC_QUERY_STATUS_RUNNING: + async_query_info_list = await self.get_async_query_info(token) + self._raise_if_multiple_async_results(async_query_info_list) + async_query_info = async_query_info_list[0] + if async_query_info.status == ASYNC_QUERY_STATUS_RUNNING: return None - return async_query_details.status == ASYNC_QUERY_STATUS_SUCCESSFUL + return async_query_info.status == ASYNC_QUERY_STATUS_SUCCESSFUL async def cancel_async_query(self, token: str) -> None: """ @@ -156,10 +168,10 @@ async def cancel_async_query(self, token: str) -> None: Args: token: Async query token. Can be obtained from Cursor.async_query_token. """ - async_query_details = await self._get_async_query_info(token) - async_query_id = async_query_details.query_id + async_query_info = await self.get_async_query_info(token) + self._raise_if_multiple_async_results(async_query_info) cursor = self.cursor() - await cursor.execute(ASYNC_QUERY_CANCEL, [async_query_id]) + await cursor.execute(ASYNC_QUERY_CANCEL, [async_query_info[0].query_id]) # Context manager support async def __aenter__(self) -> Connection: diff --git a/src/firebolt/common/base_connection.py b/src/firebolt/common/base_connection.py index 6697fee2c77..24880c60bd5 100644 --- a/src/firebolt/common/base_connection.py +++ b/src/firebolt/common/base_connection.py @@ -1,7 +1,8 @@ from collections import namedtuple from typing import Any, List, Type -from firebolt.utils.exception import ConnectionClosedError +from firebolt.common._types import ColType +from firebolt.utils.exception import ConnectionClosedError, FireboltError ASYNC_QUERY_STATUS_RUNNING = "RUNNING" ASYNC_QUERY_STATUS_SUCCESSFUL = "ENDED_SUCCESSFULLY" @@ -27,6 +28,30 @@ ) +def _parse_async_query_info_results( + result: List[List[ColType]], columns_names: List[str] +) -> List[AsyncQueryInfo]: + async_query_infos = [] + for row in result: + result_dict = dict(zip(columns_names, row)) + + if not result_dict.get("status") or not result_dict.get("query_id"): + raise FireboltError( + "Something went wrong - async query status request returned " + "unexpected result with status and/or query id missing. " + "Rerun the command and reach out to Firebolt support if " + "the issue persists." + ) + + # Only pass the expected keys to AsyncQueryInfo + filtered_result_dict = { + k: v for k, v in result_dict.items() if k in AsyncQueryInfo._fields + } + + async_query_infos.append(AsyncQueryInfo(**filtered_result_dict)) + return async_query_infos + + class BaseConnection: def __init__(self, cursor_type: Type) -> None: self.cursor_type = cursor_type diff --git a/src/firebolt/db/connection.py b/src/firebolt/db/connection.py index 8f740560dce..07b10f4ddc1 100644 --- a/src/firebolt/db/connection.py +++ b/src/firebolt/db/connection.py @@ -16,6 +16,7 @@ ASYNC_QUERY_STATUS_SUCCESSFUL, AsyncQueryInfo, BaseConnection, + _parse_async_query_info_results, ) from firebolt.common.cache import _firebolt_system_engine_cache from firebolt.common.constants import DEFAULT_TIMEOUT_SECONDS @@ -230,34 +231,41 @@ def close(self) -> None: # Server-side async methods - def _get_async_query_info(self, token: str) -> AsyncQueryInfo: + def get_async_query_info(self, token: str) -> List[AsyncQueryInfo]: + """ + Retrieve information about an asynchronous query using its token. + This method fetches the status and details of an asynchronous query + identified by the provided token. + Args: + token (str): The token identifying the asynchronous query. + Returns: + List[AsyncQueryInfo]: A list of AsyncQueryInfo objects containing + details about the asynchronous query. + """ + if self.cursor_type != CursorV2: raise FireboltError( "This method is only supported for connection with service account." ) cursor = self.cursor() cursor.execute(ASYNC_QUERY_STATUS_REQUEST, [token]) - result = cursor.fetchone() - if cursor.rowcount != 1 or not result: + results = cursor.fetchall() + if not results: raise FireboltError("Unexpected result from async query status request.") columns = cursor.description - result_dict = dict(zip([column.name for column in columns], result)) + columns_names = [column.name for column in columns] + return _parse_async_query_info_results(results, columns_names) - if not result_dict.get("status") or not result_dict.get("query_id"): - raise FireboltError( - "Something went wrong - async query status request returned " - "unexpected result with status and/or query id missing. " - "Rerun the command and reach out to Firebolt support if " - "the issue persists." + def _raise_if_multiple_async_results( + self, async_query_info: List[AsyncQueryInfo] + ) -> None: + # We expect only one result in current implementation + if len(async_query_info) != 1: + raise NotImplementedError( + "Async query status request returned more than one result. " + "This is not supported yet." ) - # Only pass the expected keys to AsyncQueryInfo - filtered_result_dict = { - k: v for k, v in result_dict.items() if k in AsyncQueryInfo._fields - } - - return AsyncQueryInfo(**filtered_result_dict) - def is_async_query_running(self, token: str) -> bool: """ Check if an async query is still running. @@ -268,7 +276,10 @@ def is_async_query_running(self, token: str) -> bool: Returns: bool: True if async query is still running, False otherwise """ - return self._get_async_query_info(token).status == ASYNC_QUERY_STATUS_RUNNING + async_query_info = self.get_async_query_info(token) + self._raise_if_multiple_async_results(async_query_info) + # We expect only one result + return async_query_info[0].status == ASYNC_QUERY_STATUS_RUNNING def is_async_query_successful(self, token: str) -> Optional[bool]: """ @@ -281,7 +292,9 @@ def is_async_query_successful(self, token: str) -> Optional[bool]: bool: None if the query is still running, True if successful, False otherwise """ - async_query_info = self._get_async_query_info(token) + async_query_info_list = self.get_async_query_info(token) + self._raise_if_multiple_async_results(async_query_info_list) + async_query_info = async_query_info_list[0] if async_query_info.status == ASYNC_QUERY_STATUS_RUNNING: return None return async_query_info.status == ASYNC_QUERY_STATUS_SUCCESSFUL @@ -293,9 +306,10 @@ def cancel_async_query(self, token: str) -> None: Args: token: Async query token. Can be obtained from Cursor.async_query_token. """ - async_query_id = self._get_async_query_info(token).query_id + async_query_info = self.get_async_query_info(token) + self._raise_if_multiple_async_results(async_query_info) cursor = self.cursor() - cursor.execute(ASYNC_QUERY_CANCEL, [async_query_id]) + cursor.execute(ASYNC_QUERY_CANCEL, [async_query_info[0].query_id]) # Context manager support def __enter__(self) -> Connection: diff --git a/tests/integration/dbapi/async/V2/test_server_async.py b/tests/integration/dbapi/async/V2/test_server_async.py index f6ed4452b39..cb7f2b1e27e 100644 --- a/tests/integration/dbapi/async/V2/test_server_async.py +++ b/tests/integration/dbapi/async/V2/test_server_async.py @@ -4,7 +4,7 @@ from pytest import raises -from firebolt.db import Connection +from firebolt.async_db import Connection from firebolt.utils.exception import FireboltError, FireboltStructuredError LONG_SELECT = "SELECT checksum(*) FROM GENERATE_SERIES(1, 2500000000)" # approx 3 sec @@ -30,6 +30,21 @@ async def test_insert_async(connection: Connection) -> None: await cursor.execute(f"SELECT * FROM {table_name}") result = await cursor.fetchall() assert result == [[1, "test"]] + info = await connection.get_async_query_info(token) + assert len(info) == 1 + # Verify query id is showing in query history + for _ in range(3): + await cursor.execute( + "SELECT 1 FROM information_schema.engine_query_history WHERE status='STARTED_EXECUTION' AND query_id = ?", + [info[0].query_id], + ) + query_history_result = await cursor.fetchall() + if len(query_history_result) != 0: + break + # Sometimes it takes a while for the query history to be updated + # so we will retry a few times + time.sleep(10) + assert len(query_history_result) == 1 finally: await cursor.execute(f"DROP TABLE {table_name}") @@ -50,7 +65,7 @@ async def test_insert_async_running(connection: Connection) -> None: async def test_check_async_execution_from_another_connection( - connection_factory: Callable[..., Connection] + connection_factory: Callable[..., Connection], ) -> None: connection_1 = await connection_factory() connection_2 = await connection_factory() diff --git a/tests/integration/dbapi/sync/V2/test_server_async.py b/tests/integration/dbapi/sync/V2/test_server_async.py index c96c65ec456..993b0923d20 100644 --- a/tests/integration/dbapi/sync/V2/test_server_async.py +++ b/tests/integration/dbapi/sync/V2/test_server_async.py @@ -28,6 +28,21 @@ def test_insert_async(connection: Connection) -> None: cursor.execute(f"SELECT * FROM {table_name}") result = cursor.fetchall() assert result == [[1, "test"]] + info = connection.get_async_query_info(token) + assert len(info) == 1 + # Verify query id is showing in query history + for _ in range(3): + cursor.execute( + "SELECT 1 FROM information_schema.engine_query_history WHERE status='STARTED_EXECUTION' AND query_id = ?", + [info[0].query_id], + ) + query_history_result = cursor.fetchall() + if len(query_history_result) != 0: + break + # Sometimes it takes a while for the query history to be updated + # so we will retry a few times + time.sleep(10) + assert len(query_history_result) == 1 finally: cursor.execute(f"DROP TABLE {table_name}") @@ -48,7 +63,7 @@ def test_insert_async_running(connection: Connection) -> None: def test_check_async_execution_from_another_connection( - connection_factory: Callable[..., Connection] + connection_factory: Callable[..., Connection], ) -> None: connection_1 = connection_factory() connection_2 = connection_factory() diff --git a/tests/unit/async_db/test_connection.py b/tests/unit/async_db/test_connection.py index d928769cf13..f1d357672b3 100644 --- a/tests/unit/async_db/test_connection.py +++ b/tests/unit/async_db/test_connection.py @@ -578,3 +578,93 @@ async def test_async_query_cancellation( api_endpoint=api_endpoint, ) as connection: await connection.cancel_async_query("token") + + +async def test_get_async_query_info( + db_name: str, + account_name: str, + engine_name: str, + auth: Auth, + api_endpoint: str, + httpx_mock: HTTPXMock, + query_url: str, + async_query_callback_factory: Callable, + async_query_data: List[List[ColType]], + async_query_meta: List[Tuple[str, str]], + mock_connection_flow: Callable, +): + """Test get_async_query_info method""" + mock_connection_flow() + async_query_callback = async_query_callback_factory( + async_query_data, async_query_meta + ) + + httpx_mock.add_callback( + async_query_callback, + url=query_url, + match_content="CALL fb_GetAsyncStatus('token')".encode("utf-8"), + ) + + async with await connect( + database=db_name, + auth=auth, + engine_name=engine_name, + account_name=account_name, + api_endpoint=api_endpoint, + ) as connection: + result = await connection.get_async_query_info("token") + + # Verify we got a list with one AsyncQueryInfo object + assert len(result) == 1 + expected_server_status = async_query_data[0][5] + assert result[0].status == expected_server_status + + # Verify query_id matches the expected value from the data + expected_query_id = async_query_data[0][7] # Index of query_id in data + assert result[0].query_id == expected_query_id + + +async def test_multiple_results_for_async_token( + db_name: str, + account_name: str, + engine_name: str, + auth: Auth, + api_endpoint: str, + httpx_mock: HTTPXMock, + query_url: str, + async_query_callback_factory: Callable, + async_multiple_query_data: List[List[ColType]], + async_query_meta: List[Tuple[str, str]], + mock_connection_flow: Callable, +): + """ + Test get_async_query_info method with multiple results for the same token. + This future-proofs the code against changes in the server response. + """ + mock_connection_flow() + async_query_callback = async_query_callback_factory( + async_multiple_query_data, async_query_meta + ) + + httpx_mock.add_callback( + async_query_callback, + url=query_url, + match_content="CALL fb_GetAsyncStatus('token')".encode("utf-8"), + ) + + async with await connect( + database=db_name, + auth=auth, + engine_name=engine_name, + account_name=account_name, + api_endpoint=api_endpoint, + ) as connection: + with raises(NotImplementedError): + await connection.is_async_query_successful("token") + with raises(NotImplementedError): + await connection.is_async_query_running("token") + + query_info = await connection.get_async_query_info("token") + assert len(query_info) == 2, "Expected two results for the same token" + assert query_info[0].query_id == async_multiple_query_data[0][7] + assert query_info[1].query_id == async_multiple_query_data[1][7] diff --git a/tests/unit/db/test_connection.py b/tests/unit/db/test_connection.py index 024816e677a..c6e94ea1231 100644 --- a/tests/unit/db/test_connection.py +++ b/tests/unit/db/test_connection.py @@ -596,3 +596,93 @@ def test_async_query_cancellation( api_endpoint=api_endpoint, ) as connection: connection.cancel_async_query("token") + + +def test_get_async_query_info( + db_name: str, + account_name: str, + engine_name: str, + auth: Auth, + api_endpoint: str, + httpx_mock: HTTPXMock, + query_url: str, + async_query_callback_factory: Callable, + async_query_data: List[List[ColType]], + async_query_meta: List[Tuple[str, str]], + mock_connection_flow: Callable, +): + """Test get_async_query_info method""" + mock_connection_flow() + async_query_callback = async_query_callback_factory( + async_query_data, async_query_meta + ) + + httpx_mock.add_callback( + async_query_callback, + url=query_url, + match_content="CALL fb_GetAsyncStatus('token')".encode("utf-8"), + ) + + with connect( + database=db_name, + auth=auth, + engine_name=engine_name, + account_name=account_name, + api_endpoint=api_endpoint, + ) as connection: + result = connection.get_async_query_info("token") + + # Verify we got a list with one AsyncQueryInfo object + assert len(result) == 1 + expected_server_status = async_query_data[0][5] + assert result[0].status == expected_server_status + + # Verify query_id matches the expected value from the data + expected_query_id = async_query_data[0][7] # Index of query_id in data + assert result[0].query_id == expected_query_id + + +def test_multiple_results_for_async_token( + db_name: str, + account_name: str, + engine_name: str, + auth: Auth, + api_endpoint: str, + httpx_mock: HTTPXMock, + query_url: str, + async_query_callback_factory: Callable, + async_multiple_query_data: List[List[ColType]], + async_query_meta: List[Tuple[str, str]], + mock_connection_flow: Callable, +): + """ + Test get_async_query_info method with multiple results for the same token. + This future-proofs the code against changes in the server response. + """ + mock_connection_flow() + async_query_callback = async_query_callback_factory( + async_multiple_query_data, async_query_meta + ) + + httpx_mock.add_callback( + async_query_callback, + url=query_url, + match_content="CALL fb_GetAsyncStatus('token')".encode("utf-8"), + ) + + with connect( + database=db_name, + auth=auth, + engine_name=engine_name, + account_name=account_name, + api_endpoint=api_endpoint, + ) as connection: + with raises(NotImplementedError): + connection.is_async_query_successful("token") + with raises(NotImplementedError): + connection.is_async_query_running("token") + + query_info = connection.get_async_query_info("token") + assert len(query_info) == 2, "Expected two results for the same token" + assert query_info[0].query_id == async_multiple_query_data[0][7] + assert query_info[1].query_id == async_multiple_query_data[1][7] diff --git a/tests/unit/db_conftest.py b/tests/unit/db_conftest.py index 9590acad627..e19fa22023c 100644 --- a/tests/unit/db_conftest.py +++ b/tests/unit/db_conftest.py @@ -603,6 +603,41 @@ def async_query_data() -> List[List[ColType]]: return query_data +@fixture +def async_multiple_query_data() -> List[List[ColType]]: + query_data = [ + [ + "developer", + "ecosystem_ci", + "2025-01-23 14:08:06.087953+00", + "2025-01-23 14:08:06.134208+00", + "2025-01-23 14:08:06.410542+00", + "ENDED_SUCCESSFULLY", + "123e4567-e89b-12d3-a456-426614174000", + "f9520387-224c-48e9-9858-b2d05518ce94", + "", + "2", + "2", + "0", + ], + [ + "developer", + "ecosystem_ci", + "2025-01-23 14:08:06.087953+00", + "2025-01-23 14:08:06.134208+00", + "2025-01-23 14:08:06.410542+00", + "RUNNING", + "", + "987e6543-e21b-34d3-b654-426614174111", + "", + "2", + "2", + "0", + ], + ] + return query_data + + @fixture def async_query_meta() -> List[Tuple[str, str]]: query_meta = [