Skip to content
81 changes: 0 additions & 81 deletions docsrc/Connecting_and_queries.rst
Original file line number Diff line number Diff line change
Expand Up @@ -610,87 +610,6 @@ Note, however, that it is not possible to retrieve the results of a server-side
query, so these queries are best used for running DMLs and DDLs and ``SELECT``\ s should be used
only for warming the cache.

Executing asynchronous DDL commands
------------------------------------

.. _ddl_execution_example:

Executing queries server-side asynchronously is similar to executing server-side synchronous
queries, but the ``execute()`` command receives an extra parameter, ``async_execution=True``.
The example below uses ``cursor`` to create a new table called ``test_table``.
``execute(query, async_execution=True)`` will return a query ID, which can subsequently
be used to check the query status.

::

query_id = cursor.execute(
"""
CREATE FACT TABLE IF NOT EXISTS test_table (
id INT,
name TEXT
)
PRIMARY INDEX id;
""",
async_execution=True
)


To check the status of a query, send the query ID to ```get_status()``` to receive a
QueryStatus enumeration object. Possible statuses are:


* ``RUNNING``
* ``ENDED_SUCCESSFULLY``
* ``ENDED_UNSUCCESSFULLY``
* ``NOT_READY``
* ``STARTED_EXECUTION``
* ``PARSE_ERROR``
* ``CANCELED_EXECUTION``
* ``EXECUTION_ERROR``


Once the status of the table creation is ``ENDED_SUCCESSFULLY``, data can be inserted into it:

::

from firebolt.async_db.cursor import QueryStatus

query_status = cursor.get_status(query_id)

if query_status == QueryStatus.ENDED_SUCCESSFULLY:
cursor.execute(
"""
INSERT INTO test_table VALUES
(1, 'hello'),
(2, 'world'),
(3, '!');
"""
)


In addition, server-side asynchronous queries can be cancelled calling ``cancel()``.

::

query_id = cursor.execute(
"""
CREATE FACT TABLE IF NOT EXISTS test_table (
id INT,
name TEXT
)
PRIMARY INDEX id;
""",
async_execution=True
)

cursor.cancel(query_id)

query_status = cursor.get_status(query_id)

print(query_status)

**Returns**: ``CANCELED_EXECUTION``


Thread safety
==============================
Expand Down
107 changes: 8 additions & 99 deletions src/firebolt/async_db/cursor.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
from __future__ import annotations

import logging
import re
import time
from abc import ABCMeta, abstractmethod
from functools import wraps
Expand Down Expand Up @@ -35,7 +34,6 @@
UPDATE_PARAMETERS_HEADER,
BaseCursor,
CursorState,
QueryStatus,
Statistics,
_parse_update_endpoint,
_parse_update_parameters,
Expand All @@ -45,7 +43,6 @@
)
from firebolt.common.constants import ENGINE_STATUS_RUNNING_LIST
from firebolt.utils.exception import (
AsyncExecutionUnavailableError,
EngineNotRunningError,
FireboltDatabaseError,
FireboltEngineError,
Expand All @@ -57,7 +54,7 @@
if TYPE_CHECKING:
from firebolt.async_db.connection import Connection

from firebolt.utils.util import Timer, _print_error_body
from firebolt.utils.util import _print_error_body

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -126,12 +123,6 @@ async def _raise_if_error(self, resp: Response) -> None:
async def _validate_set_parameter(self, parameter: SetParameter) -> None:
"""Validate parameter by executing simple query with it."""
_raise_if_internal_set_parameter(parameter)
if parameter.name == "async_execution":
raise AsyncExecutionUnavailableError(
"It is not possible to set async_execution using a SET command. "
"Instead, pass it as an argument to the execute() or "
"executemany() function."
)
resp = await self._api_request("select 1", {parameter.name: parameter.value})
# Handle invalid set parameter
if resp.status_code == codes.BAD_REQUEST:
Expand Down Expand Up @@ -170,7 +161,6 @@ async def _do_execute(
raw_query: str,
parameters: Sequence[Sequence[ParameterType]],
skip_parsing: bool = False,
async_execution: Optional[bool] = False,
) -> None:
self._reset()
# Allow users to manually skip parsing for performance improvement.
Expand All @@ -180,13 +170,7 @@ async def _do_execute(
try:
for query in queries:
start_time = time.time()
# Our CREATE EXTERNAL TABLE queries currently require credentials,
# so we will skip logging those queries.
# https://docs.firebolt.io/sql-reference/commands/create-external-table.html
if isinstance(query, SetParameter) or not re.search(
"aws_key_id|credentials", query, flags=re.IGNORECASE
):
logger.debug(f"Running query: {query}")
Cursor._log_query(query)

# Define type for mypy
row_set: Tuple[
Expand All @@ -197,35 +181,6 @@ async def _do_execute(
] = (-1, None, None, None)
if isinstance(query, SetParameter):
await self._validate_set_parameter(query)
elif async_execution:
self._validate_server_side_async_settings(
parameters,
queries,
skip_parsing,
async_execution,
)

with Timer(
f"[PERFORMANCE] Running query {query[:50]} "
f"{'... ' if len(query) > 50 else ''}"
):
response = await self._api_request(
query,
{
"async_execution": 1,
"output_format": JSON_OUTPUT_FORMAT,
},
)

await self._raise_if_error(response)
if response.headers.get("content-length", "") == "0":
raise OperationalError("No response to asynchronous query.")
resp = response.json()
if "query_id" not in resp or resp["query_id"] == "":
raise OperationalError(
"Invalid response to asynchronous query: missing query_id."
)
self._query_id = resp["query_id"]
else:
resp = await self._api_request(
query, {"output_format": JSON_OUTPUT_FORMAT}
Expand Down Expand Up @@ -253,7 +208,6 @@ async def execute(
query: str,
parameters: Optional[Sequence[ParameterType]] = None,
skip_parsing: bool = False,
async_execution: Optional[bool] = False,
) -> Union[int, str]:
"""Prepare and execute a database query.

Expand All @@ -277,21 +231,19 @@ async def execute(
skip_parsing (bool): Flag to disable query parsing. This will
disable parameterized, multi-statement and SET queries,
while improving performance
async_execution (bool): flag to determine if query should be asynchronous

Returns:
int: Query row count.
"""
params_list = [parameters] if parameters else []
await self._do_execute(query, params_list, skip_parsing, async_execution)
return self.query_id if async_execution else self.rowcount
await self._do_execute(query, params_list, skip_parsing)
return self.rowcount

@check_not_closed
async def executemany(
self,
query: str,
parameters_seq: Sequence[Sequence[ParameterType]],
async_execution: Optional[bool] = False,
) -> Union[int, str]:
"""Prepare and execute a database query.

Expand All @@ -316,46 +268,12 @@ async def executemany(
substitution parameter sets. Used to replace '?' placeholders inside a
query with actual values from each set in a sequence. Resulting queries
for each subset are executed sequentially.
async_execution (bool): flag to determine if query should be asynchronous

Returns:
int|str: Query row count for synchronous execution of queries,
query ID string for asynchronous execution.
int: Query row count.
"""
await self._do_execute(query, parameters_seq, async_execution=async_execution)
if async_execution:
return self.query_id
else:
return self.rowcount

@check_not_closed
async def get_status(self, query_id: str) -> QueryStatus:
"""Get status of a server-side async query. Return the state of the query."""
try:
resp = await self._api_request(
# output_format must be empty for status to work correctly.
# And set parameters will cause 400 errors.
parameters={"query_id": query_id},
path="status",
use_set_parameters=False,
)
if resp.status_code == codes.BAD_REQUEST:
raise OperationalError(
f"Asynchronous query {query_id} status check failed: "
f"{resp.status_code}."
)
resp_json = resp.json()
if "status" not in resp_json:
raise OperationalError(
"Invalid response to asynchronous query: missing status."
)
except Exception:
self._state = CursorState.ERROR
raise
# Remember that query_id might be empty.
if resp_json["status"] == "":
return QueryStatus.NOT_READY
return QueryStatus[resp_json["status"]]
await self._do_execute(query, parameters_seq)
return self.rowcount

@abstractmethod
async def is_db_available(self, database: str) -> bool:
Expand Down Expand Up @@ -389,15 +307,6 @@ async def fetchall(self) -> List[List[ColType]]:
async def nextset(self) -> None:
return super().nextset()

@check_not_closed
async def cancel(self, query_id: str) -> None:
"""Cancel a server-side async query."""
await self._api_request(
parameters={"query_id": query_id},
path="cancel",
use_set_parameters=False,
)

# Iteration support
@check_not_closed
@check_query_executed
Expand Down Expand Up @@ -557,7 +466,7 @@ def __init__(
async def _api_request(
self,
query: Optional[str] = "",
parameters: Optional[dict[str, Any]] = {},
parameters: Optional[dict[str, Any]] = None,
path: Optional[str] = "",
use_set_parameters: Optional[bool] = True,
) -> Response:
Expand Down
55 changes: 12 additions & 43 deletions src/firebolt/common/base_cursor.py
Original file line number Diff line number Diff line change
@@ -1,25 +1,24 @@
from __future__ import annotations

import logging
import re
from dataclasses import dataclass, fields
from enum import Enum
from functools import wraps
from types import TracebackType
from typing import Any, Callable, Dict, List, Optional, Sequence, Tuple, Union
from typing import Any, Callable, Dict, List, Optional, Tuple, Union

from httpx import URL, Response

from firebolt.common._types import (
ColType,
Column,
ParameterType,
RawColType,
SetParameter,
parse_type,
parse_value,
)
from firebolt.utils.exception import (
AsyncExecutionUnavailableError,
ConfigurationError,
CursorClosedError,
DataError,
Expand All @@ -40,19 +39,6 @@ class CursorState(Enum):
CLOSED = 4


class QueryStatus(Enum):
"""Enumeration of query responses on server-side async queries."""

RUNNING = 1
ENDED_SUCCESSFULLY = 2
ENDED_UNSUCCESSFULLY = 3
NOT_READY = 4
STARTED_EXECUTION = 5
PARSE_ERROR = 6
CANCELED_EXECUTION = 7
EXECUTION_ERROR = 8


# Parameters that should be set using USE instead of SET
USE_PARAMETER_LIST = ["database", "engine"]
# parameters that can only be set by the backend
Expand Down Expand Up @@ -325,6 +311,16 @@ def _update_server_parameters(self, parameters: Dict[str, Any]) -> None:
for key, value in parameters.items():
self.parameters[key] = value

@staticmethod
def _log_query(query: Union[str, SetParameter]) -> None:
# Our CREATE EXTERNAL TABLE queries currently require credentials,
# so we will skip logging those queries.
# https://docs.firebolt.io/sql-reference/commands/create-external-table.html
if isinstance(query, SetParameter) or not re.search(
"aws_key_id|credentials", query, flags=re.IGNORECASE
):
logger.debug(f"Running query: {query}")

@property
def engine_name(self) -> str:
"""
Expand Down Expand Up @@ -382,33 +378,6 @@ def _append_row_set(
# Populate values for first set
self._pop_next_set()

def _validate_server_side_async_settings(
self,
parameters: Sequence[Sequence[ParameterType]],
queries: List[Union[SetParameter, str]],
skip_parsing: bool = False,
async_execution: Optional[bool] = False,
) -> None:
if async_execution and self._set_parameters.get("use_standard_sql", "1") == "0":
raise AsyncExecutionUnavailableError(
"It is not possible to execute queries asynchronously if "
"use_standard_sql=0."
)
if parameters and skip_parsing:
logger.warning(
"Query formatting parameters are provided but skip_parsing "
"is specified. They will be ignored."
)
non_set_queries = 0
for query in queries:
if type(query) is not SetParameter:
non_set_queries += 1
if non_set_queries > 1 and async_execution:
raise AsyncExecutionUnavailableError(
"It is not possible to execute multi-statement "
"queries asynchronously."
)

def _parse_row(self, row: List[RawColType]) -> List[ColType]:
"""Parse a single data row based on query column types."""
assert len(row) == len(self.description)
Expand Down
Loading