Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions setup.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ install_requires =
async-generator>=1.10
async-property>=0.2.1
cryptography>=3.4.0
httpcore<0.17.3
httpx[http2]==0.24.0
python-dateutil>=2.8.2
readerwriterlock>=1.0.9
Expand Down
12 changes: 5 additions & 7 deletions src/firebolt/async_db/connection.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
from __future__ import annotations

import logging
import socket
from types import TracebackType
from typing import Any, Dict, List, Optional
Expand Down Expand Up @@ -29,9 +28,7 @@
InterfaceError,
)
from firebolt.utils.usage_tracker import get_user_agent_header
from firebolt.utils.util import fix_url_schema

logger = logging.getLogger(__name__)
from firebolt.utils.util import Timer, fix_url_schema


class OverriddenHttpBackend(AutoBackend):
Expand Down Expand Up @@ -231,9 +228,10 @@ async def connect(

else:
try:
engine_url, status, attached_db = await _get_engine_url_status_db(
system_engine_connection, engine_name
)
with Timer("[PERFORMANCE] Resolving engine name "):
engine_url, status, attached_db = await _get_engine_url_status_db(
system_engine_connection, engine_name
)

if status != "Running":
raise EngineNotRunningError(engine_name)
Expand Down
23 changes: 15 additions & 8 deletions src/firebolt/async_db/cursor.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
if TYPE_CHECKING:
from firebolt.async_db.connection import Connection

from firebolt.utils.util import Timer

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -199,14 +200,20 @@ async def _do_execute(
skip_parsing,
async_execution,
)
response = await self._api_request(
query,
{
"async_execution": 1,
"advanced_mode": 1,
"output_format": JSON_OUTPUT_FORMAT,
},
)

with Timer(
f"[PERFORMANCE] Running query {query[:50]} "
f"{'... ' if len(query) > 50 else ''}"
):
response = await self._api_request(
query,
{
"async_execution": 1,
"advanced_mode": 1,
"output_format": JSON_OUTPUT_FORMAT,
},
)

await self._raise_if_error(response)
if response.headers.get("content-length", "") == "0":
raise OperationalError("No response to asynchronous query.")
Expand Down
21 changes: 11 additions & 10 deletions src/firebolt/client/auth/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
from httpx import Request, Response, codes

from firebolt.utils.token_storage import TokenSecureStorage
from firebolt.utils.util import cached_property
from firebolt.utils.util import Timer, cached_property


class AuthRequest(Request):
Expand Down Expand Up @@ -107,15 +107,16 @@ def auth_flow(self, request: Request) -> Generator[Request, Response, None]:
Yields:
Request: Request required for auth flow
"""
if not self.token or self.expired:
yield from self.get_new_token_generator()
self._cache_token()
with Timer("[PERFORMANCE] Authentication "):
if not self.token or self.expired:
yield from self.get_new_token_generator()
self._cache_token()

request.headers["Authorization"] = f"Bearer {self.token}"
request.headers["Authorization"] = f"Bearer {self.token}"

response = yield request
response = yield request

if response.status_code == codes.UNAUTHORIZED:
yield from self.get_new_token_generator()
request.headers["Authorization"] = f"Bearer {self.token}"
yield request
if response.status_code == codes.UNAUTHORIZED:
yield from self.get_new_token_generator()
request.headers["Authorization"] = f"Bearer {self.token}"
yield request
13 changes: 10 additions & 3 deletions src/firebolt/common/base_cursor.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
DataError,
QueryNotRunError,
)
from firebolt.utils.util import Timer

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -345,7 +346,9 @@ def fetchone(self) -> Optional[List[ColType]]:
# We are out of elements
return None
assert self._rows is not None
return self._parse_row(self._rows[left])
with Timer("[PERFORMANCE] Parsing query output into native Python types "):
result = self._parse_row(self._rows[left])
return result

@check_not_closed
@check_query_executed
Expand All @@ -358,7 +361,9 @@ def fetchmany(self, size: Optional[int] = None) -> List[List[ColType]]:
left, right = self._get_next_range(size)
assert self._rows is not None
rows = self._rows[left:right]
return [self._parse_row(row) for row in rows]
with Timer("[PERFORMANCE] Parsing query output into native Python types "):
result = [self._parse_row(row) for row in rows]
return result

@check_not_closed
@check_query_executed
Expand All @@ -367,7 +372,9 @@ def fetchall(self) -> List[List[ColType]]:
left, right = self._get_next_range(self.rowcount)
assert self._rows is not None
rows = self._rows[left:right]
return [self._parse_row(row) for row in rows]
with Timer("[PERFORMANCE] Parsing query output into native Python types "):
result = [self._parse_row(row) for row in rows]
return result

@check_not_closed
def setinputsizes(self, sizes: List[int]) -> None:
Expand Down
28 changes: 28 additions & 0 deletions src/firebolt/utils/util.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,14 @@
import logging
from functools import lru_cache
from os import environ
from time import time
from types import TracebackType
from typing import TYPE_CHECKING, Callable, Type, TypeVar

from httpx import URL

T = TypeVar("T")
logger = logging.getLogger(__name__)


def cached_property(func: Callable[..., T]) -> T:
Expand Down Expand Up @@ -99,3 +104,26 @@ def merge_urls(base: URL, merge: URL) -> URL:
merge_raw_path = base.raw_path + merge.raw_path.lstrip(b"/")
return base.copy_with(raw_path=merge_raw_path)
return merge


class Timer:
def __init__(self, message: str = ""):
self._message = message

def __enter__(self) -> "Timer":
self._start_time: float = time()
return self

def __exit__(
self,
exc_type: Type[BaseException],
exc_val: BaseException,
exc_tb: TracebackType,
) -> None:
self.elapsed_time: str = "{:.2f}".format(round((time() - self._start_time), 2))
if (
environ.get("FIREBOLT_SDK_PERFORMANCE_DEBUG", "0") == "1"
and self._message != ""
):
log_message = self._message + self.elapsed_time + "s"
logger.debug(log_message)