From 19bcb8476fc6b1313477a9d70a81fa82093a5014 Mon Sep 17 00:00:00 2001 From: RotemFB Date: Wed, 5 Jul 2023 11:35:55 +0300 Subject: [PATCH 1/8] Added Performance debug info --- src/firebolt/async_db/connection.py | 14 ++++++++++++++ src/firebolt/async_db/cursor.py | 17 +++++++++++++++++ src/firebolt/client/auth/base.py | 16 ++++++++++++++++ src/firebolt/common/base_cursor.py | 19 ++++++++++++++++++- 4 files changed, 65 insertions(+), 1 deletion(-) diff --git a/src/firebolt/async_db/connection.py b/src/firebolt/async_db/connection.py index f73fb28627a..06bb399db4e 100644 --- a/src/firebolt/async_db/connection.py +++ b/src/firebolt/async_db/connection.py @@ -2,6 +2,8 @@ import logging import socket +from os import environ +from time import time from types import TracebackType from typing import Any, Dict, List, Optional @@ -231,10 +233,22 @@ async def connect( else: try: + timerStart = time() engine_url, status, attached_db = await _get_engine_url_status_db( system_engine_connection, engine_name ) + teimerEnd = time() + envVar = "0" + try: + envVar = environ["FIREBOLT_SDK_PERFORMANCE_DEBUG"] + except Exception: + pass + + if envVar == "1": + totalTime = "{:.2f}".format(round((teimerEnd - timerStart), 2)) + logger.debug(f"[PERFORMANCE] Resolving engine name {totalTime}s") + if status != "Running": raise EngineNotRunningError(engine_name) diff --git a/src/firebolt/async_db/cursor.py b/src/firebolt/async_db/cursor.py index 850039ac451..1af9452a8c0 100644 --- a/src/firebolt/async_db/cursor.py +++ b/src/firebolt/async_db/cursor.py @@ -4,6 +4,7 @@ import re import time from functools import wraps +from os import environ from types import TracebackType from typing import ( TYPE_CHECKING, @@ -199,6 +200,8 @@ async def _do_execute( skip_parsing, async_execution, ) + + timerStart = time.time() response = await self._api_request( query, { @@ -207,6 +210,20 @@ async def _do_execute( "output_format": JSON_OUTPUT_FORMAT, }, ) + + timerEnd = time.time() + envVar = "0" + try: + envVar = environ["FIREBOLT_SDK_PERFORMANCE_DEBUG"] + except Exception: + pass + + if envVar == "1": + totalTime = "{:.2f}".format(round((timerEnd - timerStart), 2)) + logger.debug( + f"[PERFORMANCE] Running query {query} {totalTime}s" + ) + await self._raise_if_error(response) if response.headers.get("content-length", "") == "0": raise OperationalError("No response to asynchronous query.") diff --git a/src/firebolt/client/auth/base.py b/src/firebolt/client/auth/base.py index 75d6b29d816..616e03b0939 100644 --- a/src/firebolt/client/auth/base.py +++ b/src/firebolt/client/auth/base.py @@ -1,3 +1,5 @@ +from logging import getLogger +from os import environ from time import time from typing import Generator, Optional @@ -7,6 +9,8 @@ from firebolt.utils.token_storage import TokenSecureStorage from firebolt.utils.util import cached_property +logger = getLogger(__name__) + class AuthRequest(Request): """Class to distinguish auth requests from regular""" @@ -107,6 +111,7 @@ def auth_flow(self, request: Request) -> Generator[Request, Response, None]: Yields: Request: Request required for auth flow """ + timerStart = time() if not self.token or self.expired: yield from self.get_new_token_generator() self._cache_token() @@ -119,3 +124,14 @@ def auth_flow(self, request: Request) -> Generator[Request, Response, None]: yield from self.get_new_token_generator() request.headers["Authorization"] = f"Bearer {self.token}" yield request + + timerEnd = time() + envVar = "0" + try: + envVar = environ["FIREBOLT_SDK_PERFORMANCE_DEBUG"] + except Exception: + pass + + if envVar == "1": + totalTime = "{:.2f}".format(round((timerEnd - timerStart), 2)) + logger.debug(f"[PERFORMANCE] Authentication {totalTime}s") diff --git a/src/firebolt/common/base_cursor.py b/src/firebolt/common/base_cursor.py index b6c690bfce3..7550e72092c 100644 --- a/src/firebolt/common/base_cursor.py +++ b/src/firebolt/common/base_cursor.py @@ -4,6 +4,8 @@ from dataclasses import dataclass, fields from enum import Enum from functools import wraps +from os import environ +from time import time from types import TracebackType from typing import Any, Callable, Dict, List, Optional, Sequence, Tuple, Union @@ -367,7 +369,22 @@ def fetchall(self) -> List[List[ColType]]: left, right = self._get_next_range(self.rowcount) assert self._rows is not None rows = self._rows[left:right] - return [self._parse_row(row) for row in rows] + + timerStart = time() + result = [self._parse_row(row) for row in rows] + timerEnd = time() + envVar = "0" + try: + envVar = environ["FIREBOLT_SDK_PERFORMANCE_DEBUG"] + except Exception: + pass + + if envVar == "1": + tTime = "{:.2f}".format(round((timerEnd - timerStart), 2)) + logger.debug( + f"[PERFORMANCE] Parsing query output into native Python types {tTime}s" + ) + return result @check_not_closed def setinputsizes(self, sizes: List[int]) -> None: From f28a6686417934b8ba4684e668e8787f081b6246 Mon Sep 17 00:00:00 2001 From: RotemFB Date: Wed, 5 Jul 2023 16:00:23 +0300 Subject: [PATCH 2/8] Fixed comments of CR --- src/firebolt/async_db/connection.py | 22 ++++++------------- src/firebolt/async_db/cursor.py | 32 +++++++++++----------------- src/firebolt/client/auth/base.py | 33 ++++++++++++----------------- src/firebolt/common/base_cursor.py | 14 ++++-------- src/firebolt/utils/util.py | 16 ++++++++++++++ 5 files changed, 52 insertions(+), 65 deletions(-) diff --git a/src/firebolt/async_db/connection.py b/src/firebolt/async_db/connection.py index 06bb399db4e..7339cb0c4b7 100644 --- a/src/firebolt/async_db/connection.py +++ b/src/firebolt/async_db/connection.py @@ -3,7 +3,6 @@ import logging import socket from os import environ -from time import time from types import TracebackType from typing import Any, Dict, List, Optional @@ -31,7 +30,7 @@ InterfaceError, ) from firebolt.utils.usage_tracker import get_user_agent_header -from firebolt.utils.util import fix_url_schema +from firebolt.utils.util import Timer, fix_url_schema logger = logging.getLogger(__name__) @@ -233,21 +232,14 @@ async def connect( else: try: - timerStart = time() - engine_url, status, attached_db = await _get_engine_url_status_db( - system_engine_connection, engine_name - ) - - teimerEnd = time() - envVar = "0" - try: - envVar = environ["FIREBOLT_SDK_PERFORMANCE_DEBUG"] - except Exception: - pass + with Timer() as runTime: + engine_url, status, attached_db = await _get_engine_url_status_db( + system_engine_connection, engine_name + ) + envVar = environ.get("FIREBOLT_SDK_PERFORMANCE_DEBUG", "0") if envVar == "1": - totalTime = "{:.2f}".format(round((teimerEnd - timerStart), 2)) - logger.debug(f"[PERFORMANCE] Resolving engine name {totalTime}s") + logger.debug(f"[PERFORMANCE] Resolving engine name {runTime}s") if status != "Running": raise EngineNotRunningError(engine_name) diff --git a/src/firebolt/async_db/cursor.py b/src/firebolt/async_db/cursor.py index 1af9452a8c0..c3c36a7c7fc 100644 --- a/src/firebolt/async_db/cursor.py +++ b/src/firebolt/async_db/cursor.py @@ -50,6 +50,7 @@ if TYPE_CHECKING: from firebolt.async_db.connection import Connection +from firebolt.utils.util import Timer logger = logging.getLogger(__name__) @@ -201,28 +202,19 @@ async def _do_execute( async_execution, ) - timerStart = time.time() - response = await self._api_request( - query, - { - "async_execution": 1, - "advanced_mode": 1, - "output_format": JSON_OUTPUT_FORMAT, - }, - ) - - timerEnd = time.time() - envVar = "0" - try: - envVar = environ["FIREBOLT_SDK_PERFORMANCE_DEBUG"] - except Exception: - pass + with Timer() as runTime: + response = await self._api_request( + query, + { + "async_execution": 1, + "advanced_mode": 1, + "output_format": JSON_OUTPUT_FORMAT, + }, + ) + envVar = environ.get("FIREBOLT_SDK_PERFORMANCE_DEBUG", "0") if envVar == "1": - totalTime = "{:.2f}".format(round((timerEnd - timerStart), 2)) - logger.debug( - f"[PERFORMANCE] Running query {query} {totalTime}s" - ) + logger.debug(f"[PERFORMANCE] Running query {query} {runTime}s") await self._raise_if_error(response) if response.headers.get("content-length", "") == "0": diff --git a/src/firebolt/client/auth/base.py b/src/firebolt/client/auth/base.py index 616e03b0939..9837f3d3793 100644 --- a/src/firebolt/client/auth/base.py +++ b/src/firebolt/client/auth/base.py @@ -7,7 +7,7 @@ from httpx import Request, Response, codes from firebolt.utils.token_storage import TokenSecureStorage -from firebolt.utils.util import cached_property +from firebolt.utils.util import Timer, cached_property logger = getLogger(__name__) @@ -111,27 +111,20 @@ def auth_flow(self, request: Request) -> Generator[Request, Response, None]: Yields: Request: Request required for auth flow """ - timerStart = time() - if not self.token or self.expired: - yield from self.get_new_token_generator() - self._cache_token() + with Timer() as runTime: + if not self.token or self.expired: + yield from self.get_new_token_generator() + self._cache_token() - request.headers["Authorization"] = f"Bearer {self.token}" - - response = yield request - - if response.status_code == codes.UNAUTHORIZED: - yield from self.get_new_token_generator() request.headers["Authorization"] = f"Bearer {self.token}" - yield request - timerEnd = time() - envVar = "0" - try: - envVar = environ["FIREBOLT_SDK_PERFORMANCE_DEBUG"] - except Exception: - pass + response = yield request + + if response.status_code == codes.UNAUTHORIZED: + yield from self.get_new_token_generator() + request.headers["Authorization"] = f"Bearer {self.token}" + yield request + envVar = environ.get("FIREBOLT_SDK_PERFORMANCE_DEBUG", "0") if envVar == "1": - totalTime = "{:.2f}".format(round((timerEnd - timerStart), 2)) - logger.debug(f"[PERFORMANCE] Authentication {totalTime}s") + logger.debug(f"[PERFORMANCE] Authentication {runTime}s") diff --git a/src/firebolt/common/base_cursor.py b/src/firebolt/common/base_cursor.py index 7550e72092c..10d373dcce2 100644 --- a/src/firebolt/common/base_cursor.py +++ b/src/firebolt/common/base_cursor.py @@ -5,7 +5,6 @@ from enum import Enum from functools import wraps from os import environ -from time import time from types import TracebackType from typing import Any, Callable, Dict, List, Optional, Sequence, Tuple, Union @@ -26,6 +25,7 @@ DataError, QueryNotRunError, ) +from firebolt.utils.util import Timer logger = logging.getLogger(__name__) @@ -370,17 +370,11 @@ def fetchall(self) -> List[List[ColType]]: assert self._rows is not None rows = self._rows[left:right] - timerStart = time() - result = [self._parse_row(row) for row in rows] - timerEnd = time() - envVar = "0" - try: - envVar = environ["FIREBOLT_SDK_PERFORMANCE_DEBUG"] - except Exception: - pass + with Timer() as tTime: + result = [self._parse_row(row) for row in rows] + envVar = environ.get("FIREBOLT_SDK_PERFORMANCE_DEBUG", "0") if envVar == "1": - tTime = "{:.2f}".format(round((timerEnd - timerStart), 2)) logger.debug( f"[PERFORMANCE] Parsing query output into native Python types {tTime}s" ) diff --git a/src/firebolt/utils/util.py b/src/firebolt/utils/util.py index c7b7533b847..d6d979d45c0 100644 --- a/src/firebolt/utils/util.py +++ b/src/firebolt/utils/util.py @@ -1,4 +1,6 @@ from functools import lru_cache +from time import time +from types import TracebackType from typing import TYPE_CHECKING, Callable, Type, TypeVar from httpx import URL @@ -99,3 +101,17 @@ def merge_urls(base: URL, merge: URL) -> URL: merge_raw_path = base.raw_path + merge.raw_path.lstrip(b"/") return base.copy_with(raw_path=merge_raw_path) return merge + + +class Timer: + def __enter__(self) -> "Timer": + self.start_time: float = time() + return self + + def __exit__( + self, + exc_type: Type[BaseException], + exc_val: BaseException, + exc_tb: TracebackType, + ) -> None: + self.elapsed_time: str = "{:.2f}".format(round((time() - self.start_time), 2)) From 7ba4b3db0e56901dd396c2a206a35e6e98587d27 Mon Sep 17 00:00:00 2001 From: RotemFB Date: Wed, 5 Jul 2023 16:58:03 +0300 Subject: [PATCH 3/8] lock httpcore --- setup.cfg | 1 + 1 file changed, 1 insertion(+) diff --git a/setup.cfg b/setup.cfg index f1fd9b6e343..6b42f9dc997 100755 --- a/setup.cfg +++ b/setup.cfg @@ -29,6 +29,7 @@ install_requires = async-generator>=1.10 async-property>=0.2.1 cryptography>=3.4.0 + httpcore<0.17.3 httpx[http2]==0.24.0 python-dateutil>=2.8.2 readerwriterlock>=1.0.9 From cad7f64346716595138250cc69782e2047465952 Mon Sep 17 00:00:00 2001 From: RotemFB Date: Wed, 5 Jul 2023 17:08:54 +0300 Subject: [PATCH 4/8] lil modification --- setup.cfg | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/setup.cfg b/setup.cfg index 6b42f9dc997..cc7dae3c4af 100755 --- a/setup.cfg +++ b/setup.cfg @@ -29,7 +29,7 @@ install_requires = async-generator>=1.10 async-property>=0.2.1 cryptography>=3.4.0 - httpcore<0.17.3 + httpcore<0.17.3 httpx[http2]==0.24.0 python-dateutil>=2.8.2 readerwriterlock>=1.0.9 From 574d3fcb8a145674d604994af0d5369c82db90e8 Mon Sep 17 00:00:00 2001 From: RotemFB Date: Sun, 9 Jul 2023 10:26:34 +0300 Subject: [PATCH 5/8] Limited query length when being printed in performance debug --- src/firebolt/async_db/cursor.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/src/firebolt/async_db/cursor.py b/src/firebolt/async_db/cursor.py index c3c36a7c7fc..19359f6245e 100644 --- a/src/firebolt/async_db/cursor.py +++ b/src/firebolt/async_db/cursor.py @@ -214,7 +214,9 @@ async def _do_execute( envVar = environ.get("FIREBOLT_SDK_PERFORMANCE_DEBUG", "0") if envVar == "1": - logger.debug(f"[PERFORMANCE] Running query {query} {runTime}s") + logger.debug( + f"[PERFORMANCE] Running query {query[:50]} ... {runTime}s" + ) await self._raise_if_error(response) if response.headers.get("content-length", "") == "0": From 6f3bc673c73e3f79b770fc5aa0e745cf5c6cf320 Mon Sep 17 00:00:00 2001 From: RotemFB Date: Tue, 11 Jul 2023 11:29:00 +0300 Subject: [PATCH 6/8] Added logging logic into the Timer --- src/firebolt/async_db/connection.py | 10 +--------- src/firebolt/async_db/cursor.py | 12 ++++-------- src/firebolt/client/auth/base.py | 10 +--------- src/firebolt/common/base_cursor.py | 8 +------- src/firebolt/utils/util.py | 16 ++++++++++++++-- 5 files changed, 21 insertions(+), 35 deletions(-) diff --git a/src/firebolt/async_db/connection.py b/src/firebolt/async_db/connection.py index 7339cb0c4b7..b659f533ca7 100644 --- a/src/firebolt/async_db/connection.py +++ b/src/firebolt/async_db/connection.py @@ -1,8 +1,6 @@ from __future__ import annotations -import logging import socket -from os import environ from types import TracebackType from typing import Any, Dict, List, Optional @@ -32,8 +30,6 @@ from firebolt.utils.usage_tracker import get_user_agent_header from firebolt.utils.util import Timer, fix_url_schema -logger = logging.getLogger(__name__) - class OverriddenHttpBackend(AutoBackend): """ @@ -232,15 +228,11 @@ async def connect( else: try: - with Timer() as runTime: + with Timer("[PERFORMANCE] Resolving engine name "): engine_url, status, attached_db = await _get_engine_url_status_db( system_engine_connection, engine_name ) - envVar = environ.get("FIREBOLT_SDK_PERFORMANCE_DEBUG", "0") - if envVar == "1": - logger.debug(f"[PERFORMANCE] Resolving engine name {runTime}s") - if status != "Running": raise EngineNotRunningError(engine_name) diff --git a/src/firebolt/async_db/cursor.py b/src/firebolt/async_db/cursor.py index 19359f6245e..0ded9783f2c 100644 --- a/src/firebolt/async_db/cursor.py +++ b/src/firebolt/async_db/cursor.py @@ -4,7 +4,6 @@ import re import time from functools import wraps -from os import environ from types import TracebackType from typing import ( TYPE_CHECKING, @@ -202,7 +201,10 @@ async def _do_execute( async_execution, ) - with Timer() as runTime: + with Timer( + f"[PERFORMANCE] Running query {query[:50]} " + f"{'... ' if len(query) > 50 else ''}" + ): response = await self._api_request( query, { @@ -212,12 +214,6 @@ async def _do_execute( }, ) - envVar = environ.get("FIREBOLT_SDK_PERFORMANCE_DEBUG", "0") - if envVar == "1": - logger.debug( - f"[PERFORMANCE] Running query {query[:50]} ... {runTime}s" - ) - await self._raise_if_error(response) if response.headers.get("content-length", "") == "0": raise OperationalError("No response to asynchronous query.") diff --git a/src/firebolt/client/auth/base.py b/src/firebolt/client/auth/base.py index 9837f3d3793..cacd286913e 100644 --- a/src/firebolt/client/auth/base.py +++ b/src/firebolt/client/auth/base.py @@ -1,5 +1,3 @@ -from logging import getLogger -from os import environ from time import time from typing import Generator, Optional @@ -9,8 +7,6 @@ from firebolt.utils.token_storage import TokenSecureStorage from firebolt.utils.util import Timer, cached_property -logger = getLogger(__name__) - class AuthRequest(Request): """Class to distinguish auth requests from regular""" @@ -111,7 +107,7 @@ def auth_flow(self, request: Request) -> Generator[Request, Response, None]: Yields: Request: Request required for auth flow """ - with Timer() as runTime: + with Timer("[PERFORMANCE] Authentication "): if not self.token or self.expired: yield from self.get_new_token_generator() self._cache_token() @@ -124,7 +120,3 @@ def auth_flow(self, request: Request) -> Generator[Request, Response, None]: yield from self.get_new_token_generator() request.headers["Authorization"] = f"Bearer {self.token}" yield request - - envVar = environ.get("FIREBOLT_SDK_PERFORMANCE_DEBUG", "0") - if envVar == "1": - logger.debug(f"[PERFORMANCE] Authentication {runTime}s") diff --git a/src/firebolt/common/base_cursor.py b/src/firebolt/common/base_cursor.py index 10d373dcce2..385a7c3f821 100644 --- a/src/firebolt/common/base_cursor.py +++ b/src/firebolt/common/base_cursor.py @@ -4,7 +4,6 @@ from dataclasses import dataclass, fields from enum import Enum from functools import wraps -from os import environ from types import TracebackType from typing import Any, Callable, Dict, List, Optional, Sequence, Tuple, Union @@ -370,14 +369,9 @@ def fetchall(self) -> List[List[ColType]]: assert self._rows is not None rows = self._rows[left:right] - with Timer() as tTime: + with Timer("[PERFORMANCE] Parsing query output into native Python types "): result = [self._parse_row(row) for row in rows] - envVar = environ.get("FIREBOLT_SDK_PERFORMANCE_DEBUG", "0") - if envVar == "1": - logger.debug( - f"[PERFORMANCE] Parsing query output into native Python types {tTime}s" - ) return result @check_not_closed diff --git a/src/firebolt/utils/util.py b/src/firebolt/utils/util.py index d6d979d45c0..5bde37e87a7 100644 --- a/src/firebolt/utils/util.py +++ b/src/firebolt/utils/util.py @@ -1,4 +1,6 @@ +import logging from functools import lru_cache +from os import environ from time import time from types import TracebackType from typing import TYPE_CHECKING, Callable, Type, TypeVar @@ -6,6 +8,7 @@ from httpx import URL T = TypeVar("T") +logger = logging.getLogger(__name__) def cached_property(func: Callable[..., T]) -> T: @@ -104,8 +107,11 @@ def merge_urls(base: URL, merge: URL) -> URL: class Timer: + def __init__(self, message: str = ""): + self._message = message + def __enter__(self) -> "Timer": - self.start_time: float = time() + self._start_time: float = time() return self def __exit__( @@ -114,4 +120,10 @@ def __exit__( exc_val: BaseException, exc_tb: TracebackType, ) -> None: - self.elapsed_time: str = "{:.2f}".format(round((time() - self.start_time), 2)) + self.elapsed_time = "{:.2f}".format(round((time() - self._start_time), 2)) + if ( + environ.get("FIREBOLT_SDK_PERFORMANCE_DEBUG", "0") == "1" + and self._message != "" + ): + log_message = self._message + self.elapsed_time + "s" + logger.debug(log_message) From 683db3e499c4b84ea30ecd10015a1da1c87e8028 Mon Sep 17 00:00:00 2001 From: RotemFB Date: Tue, 11 Jul 2023 11:36:14 +0300 Subject: [PATCH 7/8] tiny change --- src/firebolt/utils/util.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/firebolt/utils/util.py b/src/firebolt/utils/util.py index 5bde37e87a7..9efcdceed8e 100644 --- a/src/firebolt/utils/util.py +++ b/src/firebolt/utils/util.py @@ -120,7 +120,7 @@ def __exit__( exc_val: BaseException, exc_tb: TracebackType, ) -> None: - self.elapsed_time = "{:.2f}".format(round((time() - self._start_time), 2)) + self.elapsed_time: str = "{:.2f}".format(round((time() - self._start_time), 2)) if ( environ.get("FIREBOLT_SDK_PERFORMANCE_DEBUG", "0") == "1" and self._message != "" From c578fe1928bc77ceb6e780c10d5b18a1abf8ad69 Mon Sep 17 00:00:00 2001 From: RotemFB Date: Tue, 11 Jul 2023 13:29:24 +0300 Subject: [PATCH 8/8] added Parsing query time for all fetches --- src/firebolt/common/base_cursor.py | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/src/firebolt/common/base_cursor.py b/src/firebolt/common/base_cursor.py index 385a7c3f821..9b78145cf11 100644 --- a/src/firebolt/common/base_cursor.py +++ b/src/firebolt/common/base_cursor.py @@ -346,7 +346,9 @@ def fetchone(self) -> Optional[List[ColType]]: # We are out of elements return None assert self._rows is not None - return self._parse_row(self._rows[left]) + with Timer("[PERFORMANCE] Parsing query output into native Python types "): + result = self._parse_row(self._rows[left]) + return result @check_not_closed @check_query_executed @@ -359,7 +361,9 @@ def fetchmany(self, size: Optional[int] = None) -> List[List[ColType]]: left, right = self._get_next_range(size) assert self._rows is not None rows = self._rows[left:right] - return [self._parse_row(row) for row in rows] + with Timer("[PERFORMANCE] Parsing query output into native Python types "): + result = [self._parse_row(row) for row in rows] + return result @check_not_closed @check_query_executed @@ -368,10 +372,8 @@ def fetchall(self) -> List[List[ColType]]: left, right = self._get_next_range(self.rowcount) assert self._rows is not None rows = self._rows[left:right] - with Timer("[PERFORMANCE] Parsing query output into native Python types "): result = [self._parse_row(row) for row in rows] - return result @check_not_closed