Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
67 changes: 67 additions & 0 deletions docsrc/Connecting_and_queries.rst
Original file line number Diff line number Diff line change
Expand Up @@ -594,6 +594,73 @@ load on both server and client machines can be controlled. A suggested way is to
run(run_multiple_queries())


Server-side asynchronous query execution
==========================================
Firebolt supports server-side asynchronous query execution. This feature allows you to run
queries in the background and fetch the results later. This is especially useful for long-running
queries that you don't want to wait for or maintain a persistent connection to the server.

This feature is not to be confused with the Python SDK's asynchronous functionality, which is
described in the :ref:`Asynchronous query execution <connecting_and_queries:Asynchronous query execution>` section,
used to write concurrent code. Server-side asynchronous query execution is a feature of the
Firebolt engine itself.

Submitting an asynchronous query
--------------------------------

Use :py:meth:`firebolt.db.cursor.Cursor.execute_async` method to run query without maintaing a persistent connection.
This method will return immediately, and the query will be executed in the background. Return value
of execute_async is -1, which is the rowcount for queries where it's not applicable.
`cursor.async_query_token` attribute will contain a token that can be used to monitor the query status.

::

# Synchronous execution
cursor.execute("CREATE TABLE my_table (id INT, name TEXT, date_value DATE)")

# Asynchronous execution
cursor.execute_async("INSERT INTO my_table VALUES (5, 'egg', '2022-01-01')")
token = cursor.async_query_token

Trying to access `async_query_token` before calling `execute_async` will raise an exception.

.. note::
Multiple-statement queries are not supported for asynchronous queries. However, you can run each statement
separately using multiple `execute_async` calls.

.. note::
Fetching data via SELECT is not supported and will raise an exception. execute_async is best suited for DML queries.

Monitoring the query status
----------------------------

To check the async query status you need to retrieve the token of the query. The token is a unique
identifier for the query and can be used to fetch the query status. You can store this token
outside of the current process and use it later to check the query status. :ref:`Connection <firebolt.db:Connection>` object
has two methods to check the query status: :py:meth:`firebolt.db.connection.Connection.is_async_query_running` and
:py:meth:`firebolt.db.connection.Connection.is_async_query_successful`.`is_async_query_running` will return True
if the query is still running, and False otherwise. `is_async_query_successful` will return True if the query
has finished successfully, None if query is still running and False if the query has failed.

::

while(connection.is_async_query_running(token)):
print("Query is still running")
time.sleep(1)
print("Query has finished")

success = connection.is_async_query_successful(token)
# success is None if the query is still running
if success is None:
# we should not reach this point since we've waited for is_async_query_running
raise Exception("The query is still running, use is_async_query_running to check the status")

if success:
print("Query was successful")
else:
print("Query failed")


Thread safety
==============================

Expand Down
60 changes: 56 additions & 4 deletions src/firebolt/async_db/connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,19 @@
from firebolt.client import DEFAULT_API_URL
from firebolt.client.auth import Auth
from firebolt.client.client import AsyncClient, AsyncClientV1, AsyncClientV2
from firebolt.common.base_connection import BaseConnection
from firebolt.common.base_connection import (
ASYNC_QUERY_STATUS_REQUEST,
ASYNC_QUERY_STATUS_RUNNING,
ASYNC_QUERY_STATUS_SUCCESSFUL,
BaseConnection,
)
from firebolt.common.cache import _firebolt_system_engine_cache
from firebolt.common.constants import DEFAULT_TIMEOUT_SECONDS
from firebolt.utils.exception import ConfigurationError, ConnectionClosedError
from firebolt.utils.exception import (
ConfigurationError,
ConnectionClosedError,
FireboltError,
)
from firebolt.utils.usage_tracker import get_user_agent_header
from firebolt.utils.util import fix_url_schema, validate_engine_name_and_url_v1

Expand Down Expand Up @@ -63,10 +72,9 @@ def __init__(
api_endpoint: str,
init_parameters: Optional[Dict[str, Any]] = None,
):
super().__init__()
super().__init__(cursor_type)
self.api_endpoint = api_endpoint
self.engine_url = engine_url
self.cursor_type = cursor_type
self._cursors: List[Cursor] = []
self._client = client
self.init_parameters = init_parameters or {}
Expand All @@ -81,6 +89,50 @@ def cursor(self, **kwargs: Any) -> Cursor:
self._cursors.append(c)
return c

# Server-side async methods
async def _get_async_query_status(self, token: str) -> str:
if self.cursor_type != CursorV2:
raise FireboltError(
"This method is only supported for connection with service account."
)
cursor = self.cursor()
await cursor.execute(ASYNC_QUERY_STATUS_REQUEST.format(token=token))
result = await cursor.fetchone()
if cursor.rowcount != 1 or not result:
raise FireboltError("Unexpected result from async query status request.")
columns = cursor.description
result_dict = dict(zip([column.name for column in columns], result))
return str(result_dict.get("status"))

async def is_async_query_running(self, token: str) -> bool:
"""
Check if an async query is still running.

Args:
token: Async query token. Can be obtained from Cursor.async_query_token.

Returns:
bool: True if async query is still running, False otherwise
"""
status = await self._get_async_query_status(token)
return status == ASYNC_QUERY_STATUS_RUNNING

async def is_async_query_successful(self, token: str) -> Optional[bool]:
"""
Check if an async query has finished and was successful.

Args:
token: Async query token. Can be obtained from Cursor.async_query_token.

Returns:
bool: None if the query is still running, True if successful,
False otherwise
"""
status = await self._get_async_query_status(token)
if status == ASYNC_QUERY_STATUS_RUNNING:
return None
return status == ASYNC_QUERY_STATUS_SUCCESSFUL

# Context manager support
async def __aenter__(self) -> Connection:
if self.closed:
Expand Down
152 changes: 124 additions & 28 deletions src/firebolt/async_db/cursor.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
from typing import (
TYPE_CHECKING,
Any,
Dict,
Iterator,
List,
Optional,
Expand Down Expand Up @@ -39,16 +40,18 @@
UPDATE_PARAMETERS_HEADER,
BaseCursor,
CursorState,
RowSet,
_parse_update_endpoint,
_parse_update_parameters,
_raise_if_internal_set_parameter,
async_not_allowed,
check_not_closed,
check_query_executed,
)
from firebolt.utils.exception import (
EngineNotRunningError,
FireboltDatabaseError,
FireboltError,
NotSupportedError,
OperationalError,
ProgrammingError,
QueryTimeoutError,
Expand Down Expand Up @@ -186,53 +189,93 @@ async def _parse_response_headers(self, headers: Headers) -> None:
param_dict = _parse_update_parameters(headers.get(UPDATE_PARAMETERS_HEADER))
self._update_set_parameters(param_dict)

@abstractmethod
async def execute_async(
self,
query: str,
parameters: Optional[Sequence[ParameterType]] = None,
skip_parsing: bool = False,
) -> int:
"""Execute a database query without maintaining a connection."""
...

async def _do_execute(
self,
raw_query: str,
parameters: Sequence[Sequence[ParameterType]],
skip_parsing: bool = False,
timeout: Optional[float] = None,
async_execution: bool = False,
) -> None:
self._reset()
# Allow users to manually skip parsing for performance improvement.
queries: List[Union[SetParameter, str]] = (
[raw_query] if skip_parsing else split_format_sql(raw_query, parameters)
)
timeout_controller = TimeoutController(timeout)

if len(queries) > 1 and async_execution:
raise FireboltError(
"Server side async does not support multi-statement queries"
)
try:
for query in queries:
start_time = time.time()
Cursor._log_query(query)
timeout_controller.raise_if_timeout()

if isinstance(query, SetParameter):
row_set: RowSet = (-1, None, None, None)
await self._validate_set_parameter(
query, timeout_controller.remaining()
)
else:
resp = await self._api_request(
query,
{"output_format": JSON_OUTPUT_FORMAT},
timeout=timeout_controller.remaining(),
)
await self._raise_if_error(resp)
await self._parse_response_headers(resp.headers)
row_set = self._row_set_from_response(resp)

self._append_row_set(row_set)

logger.info(
f"Query fetched {self.rowcount} rows in"
f" {time.time() - start_time} seconds."
await self._execute_single_query(
query, timeout_controller, async_execution
)

self._state = CursorState.DONE

except Exception:
self._state = CursorState.ERROR
raise

async def _execute_single_query(
self,
query: Union[SetParameter, str],
timeout_controller: TimeoutController,
async_execution: bool,
) -> None:
start_time = time.time()
Cursor._log_query(query)
timeout_controller.raise_if_timeout()

if isinstance(query, SetParameter):
if async_execution:
raise FireboltError(
"Server side async does not support set statements, "
"please use execute to set this parameter"
)
await self._validate_set_parameter(query, timeout_controller.remaining())
else:
await self._handle_query_execution(
query, timeout_controller, async_execution
)

if not async_execution:
logger.info(
f"Query fetched {self.rowcount} rows in"
f" {time.time() - start_time} seconds."
)
else:
logger.info("Query submitted for async execution.")

async def _handle_query_execution(
self, query: str, timeout_controller: TimeoutController, async_execution: bool
) -> None:
query_params: Dict[str, Any] = {"output_format": JSON_OUTPUT_FORMAT}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we check the case for when not async_execution and query_params.get("async") is True?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That opens a can of worms that we are still debating. At the moment enforcing this is out of scope https://packboard.atlassian.net/browse/FIR-42854
Set async=true will probably be not allowed on the backend altogether.

if async_execution:
query_params["async"] = True
resp = await self._api_request(
query,
query_params,
timeout=timeout_controller.remaining(),
)
await self._raise_if_error(resp)
if async_execution:
self._parse_async_response(resp)
else:
await self._parse_response_headers(resp.headers)
row_set = self._row_set_from_response(resp)
self._append_row_set(row_set)

@check_not_closed
async def execute(
self,
Expand Down Expand Up @@ -346,6 +389,7 @@ async def nextset(self) -> None:

# Iteration support
@check_not_closed
@async_not_allowed
@check_query_executed
def __aiter__(self) -> Cursor:
return self
Expand Down Expand Up @@ -373,6 +417,7 @@ async def __aexit__(
self.close()

@check_not_closed
@async_not_allowed
@check_query_executed
async def __anext__(self) -> List[ColType]:
row = await self.fetchone()
Expand All @@ -392,6 +437,47 @@ def __init__(
assert isinstance(client, AsyncClientV2)
super().__init__(*args, client=client, connection=connection, **kwargs)

@check_not_closed
async def execute_async(
self,
query: str,
parameters: Optional[Sequence[ParameterType]] = None,
skip_parsing: bool = False,
) -> int:
"""
Execute a database query without maintating a connection.

Supported features:
Parameterized queries: placeholder characters ('?') are substituted
with values provided in `parameters`. Values are formatted to
be properly recognized by database and to exclude SQL injection.

Not supported:
Multi-statement queries: multiple statements, provided in a single query
and separated by semicolon.
SET statements: to provide additional query execution parameters, execute
`SET param=value` statement before it. Use `execute` method to set
parameters.

Args:
query (str): SQL query to execute
parameters (Optional[Sequence[ParameterType]]): A sequence of substitution
parameters. Used to replace '?' placeholders inside a query with
actual values
skip_parsing (bool): Flag to disable query parsing. This will
disable parameterized queries while potentially improving performance

Returns:
int: Always returns -1, as async execution does not return row count.
"""
await self._do_execute(
query,
[parameters] if parameters else [],
skip_parsing,
async_execution=True,
)
return -1

async def is_db_available(self, database_name: str) -> bool:
"""
Verify that the database exists.
Expand Down Expand Up @@ -468,3 +554,13 @@ async def _filter_request(self, endpoint: str, filters: dict) -> Response:
)
resp.raise_for_status()
return resp

async def execute_async(
self,
query: str,
parameters: Optional[Sequence[ParameterType]] = None,
skip_parsing: bool = False,
) -> int:
raise NotSupportedError(
"Async execution is not supported in this version " " of Firebolt."
)
2 changes: 1 addition & 1 deletion src/firebolt/client/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@

DEFAULT_API_URL: str = "api.app.firebolt.io"
PROTOCOL_VERSION_HEADER_NAME = "Firebolt-Protocol-Version"
PROTOCOL_VERSION: str = "2.1"
PROTOCOL_VERSION: str = "2.3"
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok I see we did a jump here. Can you please make sure this one is also ok?
https://packboard.atlassian.net/browse/FIR-37204

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We have tests that confirm the behaviour described in there. I've just rerun them to be sure and everything passes.

_REQUEST_ERRORS: Tuple[Type, ...] = (
HTTPError,
InvalidURL,
Expand Down
Loading
Loading