Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 18 additions & 0 deletions docsrc/Connecting_and_queries.rst
Original file line number Diff line number Diff line change
Expand Up @@ -672,6 +672,24 @@ will send a cancel request to the server and the query will be stopped.
print(successful) # False


Retrieving asynchronous query information
-----------------------------------------

To get additional information about an async query, use the :py:meth:`firebolt.db.connection.Connection.get_async_query_info` method.
This method returns a list of ``AsyncQueryInfo`` objects, each containing detailed information about the query execution.

::

token = cursor.async_query_token
query_info_list = connection.get_async_query_info(token)

for query_info in query_info_list:
print(f"Query ID: {query_info.query_id}")
print(f"Status: {query_info.status}")
print(f"Submitted time: {query_info.submitted_time}")
print(f"Rows scanned: {query_info.scanned_rows}")
print(f"Error message: {query_info.error_message}")


Streaming query results
==============================
Expand Down
62 changes: 37 additions & 25 deletions src/firebolt/async_db/connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
ASYNC_QUERY_STATUS_SUCCESSFUL,
AsyncQueryInfo,
BaseConnection,
_parse_async_query_info_results,
)
from firebolt.common.cache import _firebolt_system_engine_cache
from firebolt.common.constants import DEFAULT_TIMEOUT_SECONDS
Expand Down Expand Up @@ -92,34 +93,41 @@ def cursor(self, **kwargs: Any) -> Cursor:
return c

# Server-side async methods
async def _get_async_query_info(self, token: str) -> AsyncQueryInfo:
async def get_async_query_info(self, token: str) -> List[AsyncQueryInfo]:
"""
Retrieve information about an asynchronous query using its token.
This method fetches the status and details of an asynchronous query
identified by the provided token.
Args:
token (str): The token identifying the asynchronous query.
Returns:
List[AsyncQueryInfo]: A list of AsyncQueryInfo objects containing
details about the asynchronous query.
"""

if self.cursor_type != CursorV2:
raise FireboltError(
"This method is only supported for connection with service account."
)
cursor = self.cursor()
await cursor.execute(ASYNC_QUERY_STATUS_REQUEST, [token])
result = await cursor.fetchone()
if cursor.rowcount != 1 or not result:
results = await cursor.fetchall()
if not results:
raise FireboltError("Unexpected result from async query status request.")
columns = cursor.description
result_dict = dict(zip([column.name for column in columns], result))
columns_names = [column.name for column in columns]
return _parse_async_query_info_results(results, columns_names)

if not result_dict.get("status") or not result_dict.get("query_id"):
raise FireboltError(
"Something went wrong - async query status request returned "
"unexpected result with status and/or query id missing. "
"Rerun the command and reach out to Firebolt support if "
"the issue persists."
def _raise_if_multiple_async_results(
self, async_query_info: List[AsyncQueryInfo]
) -> None:
# We expect only one result in current implementation
if len(async_query_info) != 1:
raise NotImplementedError(
"Async query status request returned more than one result. "
"This is not supported yet."
)

# Only pass the expected keys to AsyncQueryInfo
filtered_result_dict = {
k: v for k, v in result_dict.items() if k in AsyncQueryInfo._fields
}

return AsyncQueryInfo(**filtered_result_dict)

async def is_async_query_running(self, token: str) -> bool:
"""
Check if an async query is still running.
Expand All @@ -130,8 +138,10 @@ async def is_async_query_running(self, token: str) -> bool:
Returns:
bool: True if async query is still running, False otherwise
"""
async_query_details = await self._get_async_query_info(token)
return async_query_details.status == ASYNC_QUERY_STATUS_RUNNING
async_query_info = await self.get_async_query_info(token)
self._raise_if_multiple_async_results(async_query_info)
# We expect only one result
return async_query_info[0].status == ASYNC_QUERY_STATUS_RUNNING

async def is_async_query_successful(self, token: str) -> Optional[bool]:
"""
Expand All @@ -144,10 +154,12 @@ async def is_async_query_successful(self, token: str) -> Optional[bool]:
bool: None if the query is still running, True if successful,
False otherwise
"""
async_query_details = await self._get_async_query_info(token)
if async_query_details.status == ASYNC_QUERY_STATUS_RUNNING:
async_query_info_list = await self.get_async_query_info(token)
self._raise_if_multiple_async_results(async_query_info_list)
async_query_info = async_query_info_list[0]
if async_query_info.status == ASYNC_QUERY_STATUS_RUNNING:
return None
return async_query_details.status == ASYNC_QUERY_STATUS_SUCCESSFUL
return async_query_info.status == ASYNC_QUERY_STATUS_SUCCESSFUL

async def cancel_async_query(self, token: str) -> None:
"""
Expand All @@ -156,10 +168,10 @@ async def cancel_async_query(self, token: str) -> None:
Args:
token: Async query token. Can be obtained from Cursor.async_query_token.
"""
async_query_details = await self._get_async_query_info(token)
async_query_id = async_query_details.query_id
async_query_info = await self.get_async_query_info(token)
self._raise_if_multiple_async_results(async_query_info)
cursor = self.cursor()
await cursor.execute(ASYNC_QUERY_CANCEL, [async_query_id])
await cursor.execute(ASYNC_QUERY_CANCEL, [async_query_info[0].query_id])

# Context manager support
async def __aenter__(self) -> Connection:
Expand Down
27 changes: 26 additions & 1 deletion src/firebolt/common/base_connection.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
from collections import namedtuple
from typing import Any, List, Type

from firebolt.utils.exception import ConnectionClosedError
from firebolt.common._types import ColType
from firebolt.utils.exception import ConnectionClosedError, FireboltError

ASYNC_QUERY_STATUS_RUNNING = "RUNNING"
ASYNC_QUERY_STATUS_SUCCESSFUL = "ENDED_SUCCESSFULLY"
Expand All @@ -27,6 +28,30 @@
)


def _parse_async_query_info_results(
result: List[List[ColType]], columns_names: List[str]
) -> List[AsyncQueryInfo]:
async_query_infos = []
for row in result:
result_dict = dict(zip(columns_names, row))

if not result_dict.get("status") or not result_dict.get("query_id"):
raise FireboltError(
"Something went wrong - async query status request returned "
"unexpected result with status and/or query id missing. "
"Rerun the command and reach out to Firebolt support if "
"the issue persists."
)

# Only pass the expected keys to AsyncQueryInfo
filtered_result_dict = {
k: v for k, v in result_dict.items() if k in AsyncQueryInfo._fields
}

async_query_infos.append(AsyncQueryInfo(**filtered_result_dict))
return async_query_infos


class BaseConnection:
def __init__(self, cursor_type: Type) -> None:
self.cursor_type = cursor_type
Expand Down
56 changes: 35 additions & 21 deletions src/firebolt/db/connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
ASYNC_QUERY_STATUS_SUCCESSFUL,
AsyncQueryInfo,
BaseConnection,
_parse_async_query_info_results,
)
from firebolt.common.cache import _firebolt_system_engine_cache
from firebolt.common.constants import DEFAULT_TIMEOUT_SECONDS
Expand Down Expand Up @@ -230,34 +231,41 @@ def close(self) -> None:

# Server-side async methods

def _get_async_query_info(self, token: str) -> AsyncQueryInfo:
def get_async_query_info(self, token: str) -> List[AsyncQueryInfo]:
"""
Retrieve information about an asynchronous query using its token.
This method fetches the status and details of an asynchronous query
identified by the provided token.
Args:
token (str): The token identifying the asynchronous query.
Returns:
List[AsyncQueryInfo]: A list of AsyncQueryInfo objects containing
details about the asynchronous query.
"""

if self.cursor_type != CursorV2:
raise FireboltError(
"This method is only supported for connection with service account."
)
cursor = self.cursor()
cursor.execute(ASYNC_QUERY_STATUS_REQUEST, [token])
result = cursor.fetchone()
if cursor.rowcount != 1 or not result:
results = cursor.fetchall()
if not results:
raise FireboltError("Unexpected result from async query status request.")
columns = cursor.description
result_dict = dict(zip([column.name for column in columns], result))
columns_names = [column.name for column in columns]
return _parse_async_query_info_results(results, columns_names)

if not result_dict.get("status") or not result_dict.get("query_id"):
raise FireboltError(
"Something went wrong - async query status request returned "
"unexpected result with status and/or query id missing. "
"Rerun the command and reach out to Firebolt support if "
"the issue persists."
def _raise_if_multiple_async_results(
self, async_query_info: List[AsyncQueryInfo]
) -> None:
# We expect only one result in current implementation
if len(async_query_info) != 1:
raise NotImplementedError(
"Async query status request returned more than one result. "
"This is not supported yet."
)

# Only pass the expected keys to AsyncQueryInfo
filtered_result_dict = {
k: v for k, v in result_dict.items() if k in AsyncQueryInfo._fields
}

return AsyncQueryInfo(**filtered_result_dict)

def is_async_query_running(self, token: str) -> bool:
"""
Check if an async query is still running.
Expand All @@ -268,7 +276,10 @@ def is_async_query_running(self, token: str) -> bool:
Returns:
bool: True if async query is still running, False otherwise
"""
return self._get_async_query_info(token).status == ASYNC_QUERY_STATUS_RUNNING
async_query_info = self.get_async_query_info(token)
self._raise_if_multiple_async_results(async_query_info)
# We expect only one result
return async_query_info[0].status == ASYNC_QUERY_STATUS_RUNNING

def is_async_query_successful(self, token: str) -> Optional[bool]:
"""
Expand All @@ -281,7 +292,9 @@ def is_async_query_successful(self, token: str) -> Optional[bool]:
bool: None if the query is still running, True if successful,
False otherwise
"""
async_query_info = self._get_async_query_info(token)
async_query_info_list = self.get_async_query_info(token)
self._raise_if_multiple_async_results(async_query_info_list)
async_query_info = async_query_info_list[0]
if async_query_info.status == ASYNC_QUERY_STATUS_RUNNING:
return None
return async_query_info.status == ASYNC_QUERY_STATUS_SUCCESSFUL
Expand All @@ -293,9 +306,10 @@ def cancel_async_query(self, token: str) -> None:
Args:
token: Async query token. Can be obtained from Cursor.async_query_token.
"""
async_query_id = self._get_async_query_info(token).query_id
async_query_info = self.get_async_query_info(token)
self._raise_if_multiple_async_results(async_query_info)
cursor = self.cursor()
cursor.execute(ASYNC_QUERY_CANCEL, [async_query_id])
cursor.execute(ASYNC_QUERY_CANCEL, [async_query_info[0].query_id])

# Context manager support
def __enter__(self) -> Connection:
Expand Down
19 changes: 17 additions & 2 deletions tests/integration/dbapi/async/V2/test_server_async.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@

from pytest import raises

from firebolt.db import Connection
from firebolt.async_db import Connection
from firebolt.utils.exception import FireboltError, FireboltStructuredError

LONG_SELECT = "SELECT checksum(*) FROM GENERATE_SERIES(1, 2500000000)" # approx 3 sec
Expand All @@ -30,6 +30,21 @@ async def test_insert_async(connection: Connection) -> None:
await cursor.execute(f"SELECT * FROM {table_name}")
result = await cursor.fetchall()
assert result == [[1, "test"]]
info = await connection.get_async_query_info(token)
assert len(info) == 1
# Verify query id is showing in query history
for _ in range(3):
await cursor.execute(
"SELECT 1 FROM information_schema.engine_query_history WHERE status='STARTED_EXECUTION' AND query_id = ?",
[info[0].query_id],
)
query_history_result = await cursor.fetchall()
if len(query_history_result) != 0:
break
# Sometimes it takes a while for the query history to be updated
# so we will retry a few times
time.sleep(10)
assert len(query_history_result) == 1
finally:
await cursor.execute(f"DROP TABLE {table_name}")

Expand All @@ -50,7 +65,7 @@ async def test_insert_async_running(connection: Connection) -> None:


async def test_check_async_execution_from_another_connection(
connection_factory: Callable[..., Connection]
connection_factory: Callable[..., Connection],
) -> None:
connection_1 = await connection_factory()
connection_2 = await connection_factory()
Expand Down
17 changes: 16 additions & 1 deletion tests/integration/dbapi/sync/V2/test_server_async.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,21 @@ def test_insert_async(connection: Connection) -> None:
cursor.execute(f"SELECT * FROM {table_name}")
result = cursor.fetchall()
assert result == [[1, "test"]]
info = connection.get_async_query_info(token)
assert len(info) == 1
# Verify query id is showing in query history
for _ in range(3):
cursor.execute(
"SELECT 1 FROM information_schema.engine_query_history WHERE status='STARTED_EXECUTION' AND query_id = ?",
[info[0].query_id],
)
query_history_result = cursor.fetchall()
if len(query_history_result) != 0:
break
# Sometimes it takes a while for the query history to be updated
# so we will retry a few times
time.sleep(10)
assert len(query_history_result) == 1
finally:
cursor.execute(f"DROP TABLE {table_name}")

Expand All @@ -48,7 +63,7 @@ def test_insert_async_running(connection: Connection) -> None:


def test_check_async_execution_from_another_connection(
connection_factory: Callable[..., Connection]
connection_factory: Callable[..., Connection],
) -> None:
connection_1 = connection_factory()
connection_2 = connection_factory()
Expand Down
Loading