From 758e8ac492b03affdcbbe643d713e4ceabe2d5a6 Mon Sep 17 00:00:00 2001 From: ptiurin Date: Thu, 6 Feb 2025 15:42:44 +0000 Subject: [PATCH 1/3] feat(FIR-43324): async cancellation method --- src/firebolt/async_db/connection.py | 26 ++++++++--- src/firebolt/db/connection.py | 23 +++++++--- .../dbapi/async/V2/test_server_async.py | 20 ++++++++ .../dbapi/sync/V2/test_server_async.py | 20 ++++++++ tests/unit/async_db/test_connection.py | 46 +++++++++++++++++++ tests/unit/db/test_connection.py | 46 +++++++++++++++++++ 6 files changed, 168 insertions(+), 13 deletions(-) diff --git a/src/firebolt/async_db/connection.py b/src/firebolt/async_db/connection.py index 49a31dc6650..908980e33b8 100644 --- a/src/firebolt/async_db/connection.py +++ b/src/firebolt/async_db/connection.py @@ -90,7 +90,7 @@ def cursor(self, **kwargs: Any) -> Cursor: return c # Server-side async methods - async def _get_async_query_status(self, token: str) -> str: + async def _get_async_query_info(self, token: str) -> Dict[str, Any]: if self.cursor_type != CursorV2: raise FireboltError( "This method is only supported for connection with service account." @@ -102,7 +102,7 @@ async def _get_async_query_status(self, token: str) -> str: raise FireboltError("Unexpected result from async query status request.") columns = cursor.description result_dict = dict(zip([column.name for column in columns], result)) - return str(result_dict.get("status")) + return result_dict async def is_async_query_running(self, token: str) -> bool: """ @@ -114,8 +114,8 @@ async def is_async_query_running(self, token: str) -> bool: Returns: bool: True if async query is still running, False otherwise """ - status = await self._get_async_query_status(token) - return status == ASYNC_QUERY_STATUS_RUNNING + async_query_details = await self._get_async_query_info(token) + return async_query_details["status"] == ASYNC_QUERY_STATUS_RUNNING async def is_async_query_successful(self, token: str) -> Optional[bool]: """ @@ -128,10 +128,22 @@ async def is_async_query_successful(self, token: str) -> Optional[bool]: bool: None if the query is still running, True if successful, False otherwise """ - status = await self._get_async_query_status(token) - if status == ASYNC_QUERY_STATUS_RUNNING: + async_query_details = await self._get_async_query_info(token) + if async_query_details["status"] == ASYNC_QUERY_STATUS_RUNNING: return None - return status == ASYNC_QUERY_STATUS_SUCCESSFUL + return async_query_details["status"] == ASYNC_QUERY_STATUS_SUCCESSFUL + + async def cancel_async_query(self, token: str) -> None: + """ + Cancel an async query. + + Args: + token: Async query token. Can be obtained from Cursor.async_query_token. + """ + async_query_details = await self._get_async_query_info(token) + async_query_id = async_query_details["query_id"] + cursor = self.cursor() + await cursor.execute("CANCEL QUERY WHERE query_id=?", [async_query_id]) # Context manager support async def __aenter__(self) -> Connection: diff --git a/src/firebolt/db/connection.py b/src/firebolt/db/connection.py index aeab156dd9d..af9302f975e 100644 --- a/src/firebolt/db/connection.py +++ b/src/firebolt/db/connection.py @@ -227,7 +227,7 @@ def close(self) -> None: self._is_closed = True # Server-side async methods - def _get_async_query_status(self, token: str) -> str: + def _get_async_query_info(self, token: str) -> Dict[str, Any]: if self.cursor_type != CursorV2: raise FireboltError( "This method is only supported for connection with service account." @@ -239,7 +239,7 @@ def _get_async_query_status(self, token: str) -> str: raise FireboltError("Unexpected result from async query status request.") columns = cursor.description result_dict = dict(zip([column.name for column in columns], result)) - return result_dict["status"] + return result_dict def is_async_query_running(self, token: str) -> bool: """ @@ -251,7 +251,7 @@ def is_async_query_running(self, token: str) -> bool: Returns: bool: True if async query is still running, False otherwise """ - return self._get_async_query_status(token) == ASYNC_QUERY_STATUS_RUNNING + return self._get_async_query_info(token)["status"] == ASYNC_QUERY_STATUS_RUNNING def is_async_query_successful(self, token: str) -> Optional[bool]: """ @@ -264,10 +264,21 @@ def is_async_query_successful(self, token: str) -> Optional[bool]: bool: None if the query is still running, True if successful, False otherwise """ - status = self._get_async_query_status(token) - if status == ASYNC_QUERY_STATUS_RUNNING: + async_query_info = self._get_async_query_info(token) + if async_query_info["status"] == ASYNC_QUERY_STATUS_RUNNING: return None - return status == ASYNC_QUERY_STATUS_SUCCESSFUL + return async_query_info["status"] == ASYNC_QUERY_STATUS_SUCCESSFUL + + def cancel_async_query(self, token: str) -> None: + """ + Cancel an async query. + + Args: + token: Async query token. Can be obtained from Cursor.async_query_token. + """ + async_query_id = self._get_async_query_info(token)["query_id"] + cursor = self.cursor() + cursor.execute("CANCEL QUERY WHERE query_id=?", [async_query_id]) # Context manager support def __enter__(self) -> Connection: diff --git a/tests/integration/dbapi/async/V2/test_server_async.py b/tests/integration/dbapi/async/V2/test_server_async.py index 2875fc07778..f6ed4452b39 100644 --- a/tests/integration/dbapi/async/V2/test_server_async.py +++ b/tests/integration/dbapi/async/V2/test_server_async.py @@ -102,3 +102,23 @@ async def test_check_async_execution_fails(connection: Connection) -> None: await cursor.execute_async(f"MALFORMED QUERY") with raises(FireboltError): cursor.async_query_token + + +async def test_cancel_async_query(connection: Connection) -> None: + cursor = connection.cursor() + rnd_suffix = str(randint(0, 1000)) + table_name = f"test_insert_async_{rnd_suffix}" + try: + await cursor.execute(f"CREATE TABLE {table_name} (id LONG)") + await cursor.execute_async(f"INSERT INTO {table_name} {LONG_SELECT}") + token = cursor.async_query_token + assert token is not None, "Async token was not returned" + assert await connection.is_async_query_running(token) == True + await connection.cancel_async_query(token) + assert await connection.is_async_query_running(token) == False + assert await connection.is_async_query_successful(token) == False + await cursor.execute(f"SELECT * FROM {table_name}") + result = await cursor.fetchall() + assert result == [] + finally: + await cursor.execute(f"DROP TABLE {table_name}") diff --git a/tests/integration/dbapi/sync/V2/test_server_async.py b/tests/integration/dbapi/sync/V2/test_server_async.py index cb5b41eb12c..c96c65ec456 100644 --- a/tests/integration/dbapi/sync/V2/test_server_async.py +++ b/tests/integration/dbapi/sync/V2/test_server_async.py @@ -98,3 +98,23 @@ def test_check_async_execution_fails(connection: Connection) -> None: cursor.execute_async(f"MALFORMED QUERY") with raises(FireboltError): cursor.async_query_token + + +def test_cancel_async_query(connection: Connection) -> None: + cursor = connection.cursor() + rnd_suffix = str(randint(0, 1000)) + table_name = f"test_insert_async_{rnd_suffix}" + try: + cursor.execute(f"CREATE TABLE {table_name} (id LONG)") + cursor.execute_async(f"INSERT INTO {table_name} {LONG_SELECT}") + token = cursor.async_query_token + assert token is not None, "Async token was not returned" + assert connection.is_async_query_running(token) == True + connection.cancel_async_query(token) + assert connection.is_async_query_running(token) == False + assert connection.is_async_query_successful(token) == False + cursor.execute(f"SELECT * FROM {table_name}") + result = cursor.fetchall() + assert result == [] + finally: + cursor.execute(f"DROP TABLE {table_name}") diff --git a/tests/unit/async_db/test_connection.py b/tests/unit/async_db/test_connection.py index 801f9353962..680dde5324f 100644 --- a/tests/unit/async_db/test_connection.py +++ b/tests/unit/async_db/test_connection.py @@ -492,3 +492,49 @@ async def test_async_query_status_unexpected_result( await connection.is_async_query_running("token") with raises(FireboltError): await connection.is_async_query_successful("token") + + +async def test_async_query_cancellation( + db_name: str, + account_name: str, + engine_name: str, + auth: Auth, + api_endpoint: str, + httpx_mock: HTTPXMock, + query_url: str, + query_callback: Callable, + async_query_callback_factory: Callable, + async_query_data: List[List[ColType]], + async_query_meta: List[Tuple[str, str]], + mock_connection_flow: Callable, +): + """Test async query cancellation""" + mock_connection_flow() + async_query_data[0][5] = "RUNNING" + async_query_status_running_callback = async_query_callback_factory( + async_query_data, async_query_meta + ) + + query_dict = dict(zip([m[0] for m in async_query_meta], async_query_data[0])) + query_id = query_dict["query_id"] + + httpx_mock.add_callback( + async_query_status_running_callback, + url=query_url, + match_content="CALL fb_GetAsyncStatus('token')".encode("utf-8"), + ) + + httpx_mock.add_callback( + query_callback, + url=query_url, + match_content=f"CANCEL QUERY WHERE query_id='{query_id}'".encode("utf-8"), + ) + + async with await connect( + database=db_name, + auth=auth, + engine_name=engine_name, + account_name=account_name, + api_endpoint=api_endpoint, + ) as connection: + await connection.cancel_async_query("token") diff --git a/tests/unit/db/test_connection.py b/tests/unit/db/test_connection.py index 87c21e3aba7..d5cdbaaaef5 100644 --- a/tests/unit/db/test_connection.py +++ b/tests/unit/db/test_connection.py @@ -510,3 +510,49 @@ def test_async_query_status_unexpected_result( connection.is_async_query_running("token") with raises(FireboltError): connection.is_async_query_successful("token") + + +def test_async_query_cancellation( + db_name: str, + account_name: str, + engine_name: str, + auth: Auth, + api_endpoint: str, + httpx_mock: HTTPXMock, + query_url: str, + query_callback: Callable, + async_query_callback_factory: Callable, + async_query_data: List[List[ColType]], + async_query_meta: List[Tuple[str, str]], + mock_connection_flow: Callable, +): + """Test is_async_query_running method""" + mock_connection_flow() + async_query_data[0][5] = "RUNNING" + async_query_status_running_callback = async_query_callback_factory( + async_query_data, async_query_meta + ) + + query_dict = dict(zip([m[0] for m in async_query_meta], async_query_data[0])) + query_id = query_dict["query_id"] + + httpx_mock.add_callback( + async_query_status_running_callback, + url=query_url, + match_content="CALL fb_GetAsyncStatus('token')".encode("utf-8"), + ) + + httpx_mock.add_callback( + query_callback, + url=query_url, + match_content=f"CANCEL QUERY WHERE query_id='{query_id}'".encode("utf-8"), + ) + + with connect( + database=db_name, + auth=auth, + engine_name=engine_name, + account_name=account_name, + api_endpoint=api_endpoint, + ) as connection: + connection.cancel_async_query("token") From 2803f61a1041abaab063b5e18e36a947e7a87513 Mon Sep 17 00:00:00 2001 From: ptiurin Date: Thu, 6 Feb 2025 16:16:42 +0000 Subject: [PATCH 2/3] add doc --- docsrc/Connecting_and_queries.rst | 17 +++++++++++++++++ 1 file changed, 17 insertions(+) diff --git a/docsrc/Connecting_and_queries.rst b/docsrc/Connecting_and_queries.rst index 021a380a16a..7f35074299f 100644 --- a/docsrc/Connecting_and_queries.rst +++ b/docsrc/Connecting_and_queries.rst @@ -655,6 +655,23 @@ has finished successfully, None if query is still running and False if the query else: print("Query failed") +Cancelling a running query +-------------------------- + +To cancel a running query, use the :py:meth:`firebolt.db.connection.Connection.cancel_async_query` method. This method +will send a cancel request to the server and the query will be stopped. + +:: + + token = cursor.async_query_token + connection.cancel_async_query(token) + + # Verify that the query was cancelled + running = connection.is_async_query_running(token) + print(running) # False + successful = connection.is_async_query_successful(token) + print(successful) # False + Thread safety ============================== From 68521f1a00a3672c6f494daa4b54a0707e4f0e3b Mon Sep 17 00:00:00 2001 From: ptiurin Date: Tue, 11 Feb 2025 11:57:50 +0000 Subject: [PATCH 3/3] Refactor to add AsyncQueryInfo tuple --- src/firebolt/async_db/connection.py | 32 +++++++++++++++------ src/firebolt/common/base_connection.py | 22 +++++++++++++- src/firebolt/db/connection.py | 33 +++++++++++++++------ tests/unit/async_db/test_connection.py | 40 ++++++++++++++++++++++++++ tests/unit/db/test_connection.py | 40 ++++++++++++++++++++++++++ tests/unit/db_conftest.py | 21 ++++++++++++++ 6 files changed, 171 insertions(+), 17 deletions(-) diff --git a/src/firebolt/async_db/connection.py b/src/firebolt/async_db/connection.py index 908980e33b8..139a817895b 100644 --- a/src/firebolt/async_db/connection.py +++ b/src/firebolt/async_db/connection.py @@ -11,9 +11,11 @@ from firebolt.client.auth import Auth from firebolt.client.client import AsyncClient, AsyncClientV1, AsyncClientV2 from firebolt.common.base_connection import ( + ASYNC_QUERY_CANCEL, ASYNC_QUERY_STATUS_REQUEST, ASYNC_QUERY_STATUS_RUNNING, ASYNC_QUERY_STATUS_SUCCESSFUL, + AsyncQueryInfo, BaseConnection, ) from firebolt.common.cache import _firebolt_system_engine_cache @@ -90,19 +92,33 @@ def cursor(self, **kwargs: Any) -> Cursor: return c # Server-side async methods - async def _get_async_query_info(self, token: str) -> Dict[str, Any]: + async def _get_async_query_info(self, token: str) -> AsyncQueryInfo: if self.cursor_type != CursorV2: raise FireboltError( "This method is only supported for connection with service account." ) cursor = self.cursor() - await cursor.execute(ASYNC_QUERY_STATUS_REQUEST.format(token=token)) + await cursor.execute(ASYNC_QUERY_STATUS_REQUEST, [token]) result = await cursor.fetchone() if cursor.rowcount != 1 or not result: raise FireboltError("Unexpected result from async query status request.") columns = cursor.description result_dict = dict(zip([column.name for column in columns], result)) - return result_dict + + if not result_dict.get("status") or not result_dict.get("query_id"): + raise FireboltError( + "Something went wrong - async query status request returned " + "unexpected result with status and/or query id missing. " + "Rerun the command and reach out to Firebolt support if " + "the issue persists." + ) + + # Only pass the expected keys to AsyncQueryInfo + filtered_result_dict = { + k: v for k, v in result_dict.items() if k in AsyncQueryInfo._fields + } + + return AsyncQueryInfo(**filtered_result_dict) async def is_async_query_running(self, token: str) -> bool: """ @@ -115,7 +131,7 @@ async def is_async_query_running(self, token: str) -> bool: bool: True if async query is still running, False otherwise """ async_query_details = await self._get_async_query_info(token) - return async_query_details["status"] == ASYNC_QUERY_STATUS_RUNNING + return async_query_details.status == ASYNC_QUERY_STATUS_RUNNING async def is_async_query_successful(self, token: str) -> Optional[bool]: """ @@ -129,9 +145,9 @@ async def is_async_query_successful(self, token: str) -> Optional[bool]: False otherwise """ async_query_details = await self._get_async_query_info(token) - if async_query_details["status"] == ASYNC_QUERY_STATUS_RUNNING: + if async_query_details.status == ASYNC_QUERY_STATUS_RUNNING: return None - return async_query_details["status"] == ASYNC_QUERY_STATUS_SUCCESSFUL + return async_query_details.status == ASYNC_QUERY_STATUS_SUCCESSFUL async def cancel_async_query(self, token: str) -> None: """ @@ -141,9 +157,9 @@ async def cancel_async_query(self, token: str) -> None: token: Async query token. Can be obtained from Cursor.async_query_token. """ async_query_details = await self._get_async_query_info(token) - async_query_id = async_query_details["query_id"] + async_query_id = async_query_details.query_id cursor = self.cursor() - await cursor.execute("CANCEL QUERY WHERE query_id=?", [async_query_id]) + await cursor.execute(ASYNC_QUERY_CANCEL, [async_query_id]) # Context manager support async def __aenter__(self) -> Connection: diff --git a/src/firebolt/common/base_connection.py b/src/firebolt/common/base_connection.py index 80be32d4898..6697fee2c77 100644 --- a/src/firebolt/common/base_connection.py +++ b/src/firebolt/common/base_connection.py @@ -1,10 +1,30 @@ +from collections import namedtuple from typing import Any, List, Type from firebolt.utils.exception import ConnectionClosedError ASYNC_QUERY_STATUS_RUNNING = "RUNNING" ASYNC_QUERY_STATUS_SUCCESSFUL = "ENDED_SUCCESSFULLY" -ASYNC_QUERY_STATUS_REQUEST = "CALL fb_GetAsyncStatus('{token}')" +ASYNC_QUERY_STATUS_REQUEST = "CALL fb_GetAsyncStatus(?)" +ASYNC_QUERY_CANCEL = "CANCEL QUERY WHERE query_id=?" + +AsyncQueryInfo = namedtuple( + "AsyncQueryInfo", + [ + "account_name", + "user_name", + "submitted_time", + "start_time", + "end_time", + "status", + "request_id", + "query_id", + "error_message", + "scanned_bytes", + "scanned_rows", + "retries", + ], +) class BaseConnection: diff --git a/src/firebolt/db/connection.py b/src/firebolt/db/connection.py index af9302f975e..8f740560dce 100644 --- a/src/firebolt/db/connection.py +++ b/src/firebolt/db/connection.py @@ -10,9 +10,11 @@ from firebolt.client import DEFAULT_API_URL, Client, ClientV1, ClientV2 from firebolt.client.auth import Auth from firebolt.common.base_connection import ( + ASYNC_QUERY_CANCEL, ASYNC_QUERY_STATUS_REQUEST, ASYNC_QUERY_STATUS_RUNNING, ASYNC_QUERY_STATUS_SUCCESSFUL, + AsyncQueryInfo, BaseConnection, ) from firebolt.common.cache import _firebolt_system_engine_cache @@ -227,19 +229,34 @@ def close(self) -> None: self._is_closed = True # Server-side async methods - def _get_async_query_info(self, token: str) -> Dict[str, Any]: + + def _get_async_query_info(self, token: str) -> AsyncQueryInfo: if self.cursor_type != CursorV2: raise FireboltError( "This method is only supported for connection with service account." ) cursor = self.cursor() - cursor.execute(ASYNC_QUERY_STATUS_REQUEST.format(token=token)) + cursor.execute(ASYNC_QUERY_STATUS_REQUEST, [token]) result = cursor.fetchone() if cursor.rowcount != 1 or not result: raise FireboltError("Unexpected result from async query status request.") columns = cursor.description result_dict = dict(zip([column.name for column in columns], result)) - return result_dict + + if not result_dict.get("status") or not result_dict.get("query_id"): + raise FireboltError( + "Something went wrong - async query status request returned " + "unexpected result with status and/or query id missing. " + "Rerun the command and reach out to Firebolt support if " + "the issue persists." + ) + + # Only pass the expected keys to AsyncQueryInfo + filtered_result_dict = { + k: v for k, v in result_dict.items() if k in AsyncQueryInfo._fields + } + + return AsyncQueryInfo(**filtered_result_dict) def is_async_query_running(self, token: str) -> bool: """ @@ -251,7 +268,7 @@ def is_async_query_running(self, token: str) -> bool: Returns: bool: True if async query is still running, False otherwise """ - return self._get_async_query_info(token)["status"] == ASYNC_QUERY_STATUS_RUNNING + return self._get_async_query_info(token).status == ASYNC_QUERY_STATUS_RUNNING def is_async_query_successful(self, token: str) -> Optional[bool]: """ @@ -265,9 +282,9 @@ def is_async_query_successful(self, token: str) -> Optional[bool]: False otherwise """ async_query_info = self._get_async_query_info(token) - if async_query_info["status"] == ASYNC_QUERY_STATUS_RUNNING: + if async_query_info.status == ASYNC_QUERY_STATUS_RUNNING: return None - return async_query_info["status"] == ASYNC_QUERY_STATUS_SUCCESSFUL + return async_query_info.status == ASYNC_QUERY_STATUS_SUCCESSFUL def cancel_async_query(self, token: str) -> None: """ @@ -276,9 +293,9 @@ def cancel_async_query(self, token: str) -> None: Args: token: Async query token. Can be obtained from Cursor.async_query_token. """ - async_query_id = self._get_async_query_info(token)["query_id"] + async_query_id = self._get_async_query_info(token).query_id cursor = self.cursor() - cursor.execute("CANCEL QUERY WHERE query_id=?", [async_query_id]) + cursor.execute(ASYNC_QUERY_CANCEL, [async_query_id]) # Context manager support def __enter__(self) -> Connection: diff --git a/tests/unit/async_db/test_connection.py b/tests/unit/async_db/test_connection.py index 680dde5324f..d928769cf13 100644 --- a/tests/unit/async_db/test_connection.py +++ b/tests/unit/async_db/test_connection.py @@ -494,6 +494,46 @@ async def test_async_query_status_unexpected_result( await connection.is_async_query_successful("token") +async def test_async_query_status_no_id_or_status( + db_name: str, + account_name: str, + engine_name: str, + auth: Auth, + api_endpoint: str, + httpx_mock: HTTPXMock, + query_url: str, + async_query_callback_factory: Callable, + async_query_meta: List[Tuple[str, str]], + async_query_data: List[List[ColType]], + mock_connection_flow: Callable, +): + mock_connection_flow() + data_no_query_id = async_query_data[0].copy() + data_no_query_id[7] = "" + data_no_query_status = async_query_data[0].copy() + data_no_query_status[5] = "" + for data_case in [data_no_query_id, data_no_query_status]: + async_query_status_running_callback = async_query_callback_factory( + [data_case], async_query_meta + ) + httpx_mock.add_callback( + async_query_status_running_callback, + url=query_url, + match_content="CALL fb_GetAsyncStatus('token')".encode("utf-8"), + ) + async with await connect( + database=db_name, + auth=auth, + engine_name=engine_name, + account_name=account_name, + api_endpoint=api_endpoint, + ) as connection: + with raises(FireboltError): + await connection.is_async_query_running("token") + with raises(FireboltError): + await connection.is_async_query_successful("token") + + async def test_async_query_cancellation( db_name: str, account_name: str, diff --git a/tests/unit/db/test_connection.py b/tests/unit/db/test_connection.py index d5cdbaaaef5..024816e677a 100644 --- a/tests/unit/db/test_connection.py +++ b/tests/unit/db/test_connection.py @@ -512,6 +512,46 @@ def test_async_query_status_unexpected_result( connection.is_async_query_successful("token") +def test_async_query_status_no_id_or_status( + db_name: str, + account_name: str, + engine_name: str, + auth: Auth, + api_endpoint: str, + httpx_mock: HTTPXMock, + query_url: str, + async_query_callback_factory: Callable, + async_query_meta: List[Tuple[str, str]], + async_query_data: List[List[ColType]], + mock_connection_flow: Callable, +): + mock_connection_flow() + data_no_query_id = async_query_data[0].copy() + data_no_query_id[7] = "" + data_no_query_status = async_query_data[0].copy() + data_no_query_status[5] = "" + for data_case in [data_no_query_id, data_no_query_status]: + async_query_status_running_callback = async_query_callback_factory( + [data_case], async_query_meta + ) + httpx_mock.add_callback( + async_query_status_running_callback, + url=query_url, + match_content="CALL fb_GetAsyncStatus('token')".encode("utf-8"), + ) + with connect( + database=db_name, + auth=auth, + engine_name=engine_name, + account_name=account_name, + api_endpoint=api_endpoint, + ) as connection: + with raises(FireboltError): + connection.is_async_query_running("token") + with raises(FireboltError): + connection.is_async_query_successful("token") + + def test_async_query_cancellation( db_name: str, account_name: str, diff --git a/tests/unit/db_conftest.py b/tests/unit/db_conftest.py index cb58fee7d1a..b23a6f6aa13 100644 --- a/tests/unit/db_conftest.py +++ b/tests/unit/db_conftest.py @@ -577,6 +577,27 @@ def async_query_meta() -> List[Tuple[str, str]]: return query_meta +@fixture +def async_query_data() -> List[List[ColType]]: + query_data = [ + [ + "developer", + "ecosystem_ci", + "2025-01-23 14:08:06.087953+00", + "2025-01-23 14:08:06.134208+00", + "2025-01-23 14:08:06.410542+00", + "ENDED_SUCCESSFULLY", + "aaa-3333-5555-dddd-ae5et2da3cbe", + "bbb-2222-5555-dddd-b2d0o518ce94", + "", + "2", + "2", + "0", + ] + ] + return query_data + + @fixture def async_query_callback_factory( query_statistics: Dict[str, Any],