Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 8 additions & 1 deletion src/firebolt/async_db/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
Timestamp,
TimestampFromTicks,
)
from firebolt.common.constants import ParameterStyle
from firebolt.utils.exception import (
DatabaseError,
DataError,
Expand All @@ -34,4 +35,10 @@
apilevel = "2.0"
# threads may only share the module and connections, cursors should not be shared
threadsafety = 2
paramstyle = "qmark"
paramstyle = ParameterStyle.QMARK.value
"""
The parameter style for SQL queries. Supported values:
- 'qmark': Use ? as parameter placeholders (default, client-side substitution)
- 'native': Alias for 'qmark'
- 'fb_numeric': Use $1, $2, ... as placeholders (server-side, sent as query_parameters)
"""
8 changes: 7 additions & 1 deletion src/firebolt/async_db/connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -396,7 +396,13 @@ async def connect_v1(
timeout=Timeout(DEFAULT_TIMEOUT_SECONDS, read=None),
headers={"User-Agent": user_agent_header},
)
return Connection(engine_url, database, client, CursorV1, api_endpoint)
return Connection(
engine_url,
database,
client,
CursorV1,
api_endpoint,
)


def connect_core(
Expand Down
65 changes: 52 additions & 13 deletions src/firebolt/async_db/cursor.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
UPDATE_ENDPOINT_HEADER,
UPDATE_PARAMETERS_HEADER,
CursorState,
ParameterStyle,
)
from firebolt.common.cursor.base_cursor import (
BaseCursor,
Expand Down Expand Up @@ -216,27 +217,65 @@ async def _do_execute(
) -> None:
await self._close_rowset_and_reset()
self._row_set = StreamingAsyncRowSet() if streaming else InMemoryAsyncRowSet()
queries: List[Union[SetParameter, str]] = (
[raw_query]
if skip_parsing
else self._formatter.split_format_sql(raw_query, parameters)
)
timeout_controller = TimeoutController(timeout)
# Import paramstyle from module level
from firebolt.async_db import paramstyle

if len(queries) > 1 and async_execution:
raise FireboltError(
"Server side async does not support multi-statement queries"
)
try:
for query in queries:
await self._execute_single_query(
query, timeout_controller, async_execution, streaming
parameter_style = ParameterStyle(paramstyle)
except ValueError:
raise ProgrammingError(f"Unsupported paramstyle: {paramstyle}")
try:
if parameter_style == ParameterStyle.FB_NUMERIC:
await self._execute_fb_numeric(
raw_query, parameters, timeout, async_execution, streaming
)
else:
queries: List[Union[SetParameter, str]] = (
[raw_query]
if skip_parsing
else self._formatter.split_format_sql(raw_query, parameters)
)
timeout_controller = TimeoutController(timeout)
if len(queries) > 1 and async_execution:
raise FireboltError(
"Server side async does not support multi-statement queries"
)
for query in queries:
await self._execute_single_query(
query, timeout_controller, async_execution, streaming
)
self._state = CursorState.DONE
except Exception:
self._state = CursorState.ERROR
raise

async def _execute_fb_numeric(
self,
query: str,
parameters: Sequence[Sequence[ParameterType]],
timeout: Optional[float],
async_execution: bool,
streaming: bool,
) -> None:
Cursor._log_query(query)
timeout_controller = TimeoutController(timeout)
timeout_controller.raise_if_timeout()
query_params = self._build_fb_numeric_query_params(
parameters, streaming, async_execution
)
resp = await self._api_request(
query,
query_params,
timeout=timeout_controller.remaining(),
)
await self._raise_if_error(resp)
if async_execution:
await resp.aread()
self._parse_async_response(resp)
else:
await self._parse_response_headers(resp.headers)
await self._append_row_set_from_response(resp)

async def _execute_single_query(
self,
query: Union[SetParameter, str],
Expand Down
6 changes: 6 additions & 0 deletions src/firebolt/common/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,12 @@ class CursorState(Enum):
CLOSED = 4


class ParameterStyle(Enum):
QMARK = "qmark" # ? as parameter placeholders (default, client-side)
NATIVE = "native" # Alias for 'qmark'
FB_NUMERIC = "fb_numeric" # $1, $2, ... as placeholders (server-side)


# Parameters that should be set using USE instead of SET
USE_PARAMETER_LIST = ["database", "engine"]
# parameters that can only be set by the backend
Expand Down
41 changes: 39 additions & 2 deletions src/firebolt/common/cursor/base_cursor.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,14 @@
from __future__ import annotations

import json
import logging
import re
from types import TracebackType
from typing import Any, Dict, List, Optional, Tuple, Type, Union
from typing import Any, Dict, List, Optional, Sequence, Tuple, Type, Union

from httpx import URL, Response

from firebolt.common._types import RawColType, SetParameter
from firebolt.common._types import ParameterType, RawColType, SetParameter
from firebolt.common.constants import (
DISALLOWED_PARAMETER_LIST,
IMMUTABLE_PARAMETER_LIST,
Expand Down Expand Up @@ -226,6 +227,42 @@ def _log_query(query: Union[str, SetParameter]) -> None:
):
logger.debug(f"Running query: {query}")

def _build_fb_numeric_query_params(
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: We can consider moving this one to StatementFormatter as well

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure I agree on this one, this method builds the query parameter dictionary to be sent in the API request. IMO, this is less formatting and more request building so cursor is a better place for it.

self,
parameters: Sequence[Sequence[ParameterType]],
streaming: bool,
async_execution: bool,
) -> Dict[str, Any]:
"""
Build query parameters dictionary for fb_numeric paramstyle.

Args:
parameters: A sequence of parameter sequences. For fb_numeric,
only the first parameter sequence is used.
streaming: Whether streaming is enabled
async_execution: Whether async execution is enabled

Returns:
Dictionary of query parameters to send with the request
"""
param_list = parameters[0] if parameters else []
query_parameters = [
{
"name": f"${i+1}",
"value": self._formatter.convert_parameter_for_serialization(value),
}
for i, value in enumerate(param_list)
]

query_params: Dict[str, Any] = {
"output_format": self._get_output_format(streaming),
}
if query_parameters:
query_params["query_parameters"] = json.dumps(query_parameters)
if async_execution:
query_params["async"] = True
return query_params

@property
def engine_name(self) -> str:
"""
Expand Down
28 changes: 28 additions & 0 deletions src/firebolt/common/statement_formatter.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,34 @@ def format_value(self, value: ParameterType) -> str:

raise DataError(f"unsupported parameter type {type(value)}")

def convert_parameter_for_serialization(
self, value: ParameterType
) -> Union[int, float, bool, None, str, List]:
"""
Convert parameter values for fb_numeric paramstyle to JSON-serializable
format. This is used for server-side parameter substitution.

Basic types (int, float, bool, None) are preserved as-is.
All other types are converted to strings for JSON serialization.

Args:
value: The parameter value to convert

Returns:
JSON-serializable value (int, float, bool, None, or string)
"""
if isinstance(value, (int, float, bool)) or value is None:
return value

if isinstance(value, Decimal):
return str(value)
elif isinstance(value, bytes):
return value.decode("utf-8")
elif isinstance(value, list):
return [self.convert_parameter_for_serialization(item) for item in value]
else:
return str(value)

def format_statement(
self, statement: Statement, parameters: Sequence[ParameterType]
) -> str:
Expand Down
9 changes: 8 additions & 1 deletion src/firebolt/db/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
Timestamp,
TimestampFromTicks,
)
from firebolt.common.constants import ParameterStyle
from firebolt.db.connection import Connection, connect
from firebolt.db.cursor import Cursor
from firebolt.utils.exception import (
Expand All @@ -34,4 +35,10 @@
apilevel = "2.0"
# threads may only share the module and connections, cursors should not be shared
threadsafety = 2
paramstyle = "qmark"
paramstyle = ParameterStyle.QMARK.value
"""
The parameter style for SQL queries. Supported values:
- 'qmark': Use ? as parameter placeholders (default, client-side substitution)
- 'native': Alias for 'qmark'
- 'fb_numeric': Use $1, $2, ... as placeholders (server-side, sent as query_parameters)
"""
1 change: 0 additions & 1 deletion src/firebolt/db/connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -224,7 +224,6 @@ def __init__(
def cursor(self, **kwargs: Any) -> Cursor:
if self.closed:
raise ConnectionClosedError("Unable to create cursor: connection closed.")

c = self.cursor_type(client=self._client, connection=self, **kwargs)
self._cursors.append(c)
return c
Expand Down
65 changes: 52 additions & 13 deletions src/firebolt/db/cursor.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
UPDATE_ENDPOINT_HEADER,
UPDATE_PARAMETERS_HEADER,
CursorState,
ParameterStyle,
)
from firebolt.common.cursor.base_cursor import (
BaseCursor,
Expand Down Expand Up @@ -222,27 +223,65 @@ def _do_execute(
) -> None:
self._close_rowset_and_reset()
self._row_set = StreamingRowSet() if streaming else InMemoryRowSet()
queries: List[Union[SetParameter, str]] = (
[raw_query]
if skip_parsing
else self._formatter.split_format_sql(raw_query, parameters)
)
timeout_controller = TimeoutController(timeout)
# Import paramstyle from module level
from firebolt.db import paramstyle

if len(queries) > 1 and async_execution:
raise FireboltError(
"Server side async does not support multi-statement queries"
)
try:
for query in queries:
self._execute_single_query(
query, timeout_controller, async_execution, streaming
parameter_style = ParameterStyle(paramstyle)
except ValueError:
raise ProgrammingError(f"Unsupported paramstyle: {paramstyle}")
try:
if parameter_style == ParameterStyle.FB_NUMERIC:
self._execute_fb_numeric(
raw_query, parameters, timeout, async_execution, streaming
)
else:
queries: List[Union[SetParameter, str]] = (
[raw_query]
if skip_parsing
else self._formatter.split_format_sql(raw_query, parameters)
)
timeout_controller = TimeoutController(timeout)
if len(queries) > 1 and async_execution:
raise FireboltError(
"Server side async does not support multi-statement queries"
)
for query in queries:
self._execute_single_query(
query, timeout_controller, async_execution, streaming
)
self._state = CursorState.DONE
except Exception:
self._state = CursorState.ERROR
raise

def _execute_fb_numeric(
self,
query: str,
parameters: Sequence[Sequence[ParameterType]],
timeout: Optional[float],
async_execution: bool,
streaming: bool,
) -> None:
Cursor._log_query(query)
timeout_controller = TimeoutController(timeout)
timeout_controller.raise_if_timeout()
query_params = self._build_fb_numeric_query_params(
parameters, streaming, async_execution
)
resp = self._api_request(
query,
query_params,
timeout=timeout_controller.remaining(),
)
self._raise_if_error(resp)
if async_execution:
resp.read()
self._parse_async_response(resp)
else:
self._parse_response_headers(resp.headers)
self._append_row_set_from_response(resp)

def _execute_single_query(
self,
query: Union[SetParameter, str],
Expand Down
10 changes: 10 additions & 0 deletions tests/integration/dbapi/async/V2/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

from pytest import fixture

import firebolt.async_db
from firebolt.async_db import Connection, connect
from firebolt.client.auth.base import Auth
from firebolt.client.auth.client_credentials import ClientCredentials
Expand Down Expand Up @@ -164,3 +165,12 @@ async def mixed_case_db_and_engine(
await system_cursor.execute(f'DROP DATABASE "{test_db_name}"')
await system_cursor.execute(f'STOP ENGINE "{test_engine_name}"')
await system_cursor.execute(f'DROP ENGINE "{test_engine_name}"')


@fixture
def fb_numeric_paramstyle():
"""Fixture that sets paramstyle to fb_numeric and resets it after the test."""
original_paramstyle = firebolt.async_db.paramstyle
firebolt.async_db.paramstyle = "fb_numeric"
yield
firebolt.async_db.paramstyle = original_paramstyle
Loading