From ddf539a7804b92fa0438680f4cbf4cce1e28077e Mon Sep 17 00:00:00 2001 From: Stepan Burlakov Date: Thu, 27 Jan 2022 11:06:45 +0200 Subject: [PATCH 1/6] fix jupyter notebook --- src/firebolt/async_db/connection.py | 8 ++-- src/firebolt/common/util.py | 61 ++++++++++++++++++++++++++--- src/firebolt/db/connection.py | 10 ++--- src/firebolt/db/cursor.py | 17 ++++++-- tests/unit/common/test_util.py | 19 ++++++++- 5 files changed, 96 insertions(+), 19 deletions(-) diff --git a/src/firebolt/async_db/connection.py b/src/firebolt/async_db/connection.py index e99fb8ae9a9..1505e9ac2ad 100644 --- a/src/firebolt/async_db/connection.py +++ b/src/firebolt/async_db/connection.py @@ -3,7 +3,7 @@ import socket from json import JSONDecodeError from types import TracebackType -from typing import Callable, List, Optional, Type +from typing import Any, Callable, List, Optional, Type from httpcore.backends.auto import AutoBackend from httpcore.backends.base import AsyncNetworkStream @@ -207,7 +207,7 @@ def __init__( self._cursors: List[BaseCursor] = [] self._is_closed = False - def cursor(self) -> BaseCursor: + def _cursor(self, **kwargs: Any) -> BaseCursor: """ Create new cursor object. """ @@ -215,7 +215,7 @@ def cursor(self) -> BaseCursor: if self.closed: raise ConnectionClosedError("Unable to create cursor: connection closed") - c = self.cursor_class(self._client, self) + c = self.cursor_class(self._client, self, **kwargs) self._cursors.append(c) return c @@ -279,7 +279,7 @@ class Connection(BaseConnection): aclose = BaseConnection._aclose def cursor(self) -> Cursor: - c = super().cursor() + c = super()._cursor() assert isinstance(c, Cursor) # typecheck return c diff --git a/src/firebolt/common/util.py b/src/firebolt/common/util.py index c9507f0d7f8..5d36c59aa37 100644 --- a/src/firebolt/common/util.py +++ b/src/firebolt/common/util.py @@ -1,6 +1,20 @@ -from asyncio import get_event_loop, new_event_loop, set_event_loop +from asyncio import ( + AbstractEventLoop, + get_event_loop, + new_event_loop, + set_event_loop, +) from functools import lru_cache, wraps -from typing import TYPE_CHECKING, Any, Callable, Type, TypeVar +from threading import Thread +from typing import ( + TYPE_CHECKING, + Any, + Callable, + Coroutine, + Optional, + Type, + TypeVar, +) T = TypeVar("T") @@ -37,7 +51,39 @@ def fix_url_schema(url: str) -> str: return url if url.startswith("http") else f"https://{url}" -def async_to_sync(f: Callable) -> Callable: +class AsyncJobThread: + def __init__(self) -> None: + self.loop: Optional[AbstractEventLoop] = None + self.result: Optional[Any] = None + self.exception: Optional[BaseException] = None + super().__init__() + + def _initialize_loop(self) -> None: + if not self.loop: + try: + self.loop = get_event_loop() + except RuntimeError: + self.loop = new_event_loop() + set_event_loop(self.loop) + + def run(self, coro: Coroutine) -> None: + try: + self._initialize_loop() + assert self.loop is not None + self.result = self.loop.run_until_complete(coro) + except BaseException as e: + self.exception = e + + def execute(self, coro: Coroutine) -> Any: + thread = Thread(target=self.run, args=[coro]) + thread.start() + thread.join() + if self.exception: + raise self.exception + return self.result + + +def async_to_sync(f: Callable, async_job_thread: AsyncJobThread = None) -> Callable: @wraps(f) def sync(*args: Any, **kwargs: Any) -> Any: try: @@ -45,7 +91,12 @@ def sync(*args: Any, **kwargs: Any) -> Any: except RuntimeError: loop = new_event_loop() set_event_loop(loop) - res = loop.run_until_complete(f(*args, **kwargs)) - return res + # We are inside a running loop + if loop.is_running(): + nonlocal async_job_thread + if not async_job_thread: + async_job_thread = AsyncJobThread() + return async_job_thread.execute(f(*args, **kwargs)) + return loop.run_until_complete(f(*args, **kwargs)) return sync diff --git a/src/firebolt/db/connection.py b/src/firebolt/db/connection.py index dff31a3a48a..bcf4aa2489d 100644 --- a/src/firebolt/db/connection.py +++ b/src/firebolt/db/connection.py @@ -10,7 +10,7 @@ from firebolt.async_db.connection import BaseConnection as AsyncBaseConnection from firebolt.async_db.connection import async_connect_factory from firebolt.common.exception import ConnectionClosedError -from firebolt.common.util import async_to_sync +from firebolt.common.util import AsyncJobThread, async_to_sync from firebolt.db.cursor import Cursor DEFAULT_TIMEOUT_SECONDS: int = 5 @@ -33,7 +33,7 @@ class Connection(AsyncBaseConnection): are not implemented. """ - __slots__ = AsyncBaseConnection.__slots__ + ("_closing_lock",) + __slots__ = AsyncBaseConnection.__slots__ + ("_closing_lock", "_async_job_thread") cursor_class = Cursor @@ -42,18 +42,18 @@ def __init__(self, *args: Any, **kwargs: Any) -> None: # Holding this lock for write means that connection is closing itself. # cursor() should hold this lock for read to read/write state self._closing_lock = RWLockWrite() + self._async_job_thread = AsyncJobThread() - @wraps(AsyncBaseConnection.cursor) def cursor(self) -> Cursor: with self._closing_lock.gen_rlock(): - c = super().cursor() + c = super()._cursor(async_job_thread=self._async_job_thread) assert isinstance(c, Cursor) # typecheck return c @wraps(AsyncBaseConnection._aclose) def close(self) -> None: with self._closing_lock.gen_wlock(): - async_to_sync(self._aclose)() + async_to_sync(self._aclose, self._async_job_thread)() # Context manager support def __enter__(self) -> Connection: diff --git a/src/firebolt/db/cursor.py b/src/firebolt/db/cursor.py index 54704787457..b5b5456552e 100644 --- a/src/firebolt/db/cursor.py +++ b/src/firebolt/db/cursor.py @@ -13,7 +13,7 @@ check_not_closed, check_query_executed, ) -from firebolt.common.util import async_to_sync +from firebolt.common.util import AsyncJobThread, async_to_sync class Cursor(AsyncBaseCursor): @@ -31,11 +31,16 @@ class Cursor(AsyncBaseCursor): with :py:func:`fetchmany` method """ - __slots__ = AsyncBaseCursor.__slots__ + ("_query_lock", "_idx_lock") + __slots__ = AsyncBaseCursor.__slots__ + ( + "_query_lock", + "_idx_lock", + "_async_job_thread", + ) def __init__(self, *args: Any, **kwargs: Any) -> None: self._query_lock = RWLockWrite() self._idx_lock = Lock() + self._async_job_thread: AsyncJobThread = kwargs.pop("async_job_thread") super().__init__(*args, **kwargs) @wraps(AsyncBaseCursor.execute) @@ -46,14 +51,18 @@ def execute( set_parameters: Optional[Dict] = None, ) -> int: with self._query_lock.gen_wlock(): - return async_to_sync(super().execute)(query, parameters, set_parameters) + return async_to_sync(super().execute, self._async_job_thread)( + query, parameters, set_parameters + ) @wraps(AsyncBaseCursor.executemany) def executemany( self, query: str, parameters_seq: Sequence[Sequence[ParameterType]] ) -> int: with self._query_lock.gen_wlock(): - return async_to_sync(super().executemany)(query, parameters_seq) + return async_to_sync(super().executemany, self._async_job_thread)( + query, parameters_seq + ) @wraps(AsyncBaseCursor._get_next_range) def _get_next_range(self, size: int) -> Tuple[int, int]: diff --git a/tests/unit/common/test_util.py b/tests/unit/common/test_util.py index 57437754a30..06a384d5d78 100644 --- a/tests/unit/common/test_util.py +++ b/tests/unit/common/test_util.py @@ -1,7 +1,7 @@ from asyncio import run from threading import Thread -from pytest import raises +from pytest import mark, raises from firebolt.common.util import async_to_sync @@ -50,3 +50,20 @@ async def task(): with raises(JobMarker): async_to_sync(task)() + + +@mark.asyncio +async def test_nested_loops() -> None: + """async_to_sync properly works inside a running loop""" + + class JobMarker(Exception): + pass + + async def task(): + raise JobMarker() + + with raises(JobMarker): + await task() + + with raises(JobMarker): + async_to_sync(task)() From f5c27c8fb58db372421deb8f04abf8816bff7f19 Mon Sep 17 00:00:00 2001 From: Stepan Burlakov Date: Thu, 27 Jan 2022 11:09:05 +0200 Subject: [PATCH 2/6] extend dbapi examples --- examples/dbapi.ipynb | 139 +++++++++++++++++++++++++++++++++++++++---- 1 file changed, 129 insertions(+), 10 deletions(-) diff --git a/examples/dbapi.ipynb b/examples/dbapi.ipynb index d5e52918d0b..3503d028de0 100644 --- a/examples/dbapi.ipynb +++ b/examples/dbapi.ipynb @@ -10,13 +10,14 @@ }, { "cell_type": "code", - "execution_count": null, + "execution_count": 1, "id": "bdd3e404", "metadata": {}, "outputs": [], "source": [ "from firebolt.db import connect\n", - "from firebolt.client import DEFAULT_API_URL" + "from firebolt.client import DEFAULT_API_URL\n", + "from datetime import datetime" ] }, { @@ -29,22 +30,22 @@ }, { "cell_type": "code", - "execution_count": null, + "execution_count": 2, "id": "0ce8b2d3", "metadata": {}, "outputs": [], "source": [ "# Only one of these two parameters should be specified\n", "engine_url = \"\"\n", - "engine_name = \"\"\n", + "engine_name = \"sburlakov_test_integration\"\n", "assert bool(engine_url) != bool(\n", " engine_name\n", "), \"Specify only one of engine_name and engine_url\"\n", "\n", - "database_name = \"\"\n", - "username = \"\"\n", - "password = \"\"\n", - "api_endpoint = DEFAULT_API_URL" + "database_name = \"sburlakov_test\"\n", + "username = \"stepan.burlakov@firebolt.io\"\n", + "password = \"Not55566\"\n", + "api_endpoint = \"api.dev.firebolt.io\" # DEFAULT_API_URL" ] }, { @@ -57,7 +58,7 @@ }, { "cell_type": "code", - "execution_count": null, + "execution_count": 3, "id": "646869f7", "metadata": {}, "outputs": [], @@ -98,8 +99,32 @@ " \"insert into test_table values (1, 'hello', '2021-01-01 01:01:01'),\"\n", " \"(2, 'world', '2022-02-02 02:02:02'),\"\n", " \"(3, '!', '2023-03-03 03:03:03')\"\n", + ")" + ] + }, + { + "cell_type": "markdown", + "id": "b356295a", + "metadata": {}, + "source": [ + "### Parameterized query" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "929f5221", + "metadata": {}, + "outputs": [], + "source": [ + "cursor.execute(\n", + " \"insert into test_table values (?, ?, ?)\",\n", + " (3, \"single parameter set\", datetime.now()),\n", ")\n", - "cursor.execute(\"select * from test_table\")" + "cursor.executemany(\n", + " \"insert into test_table values (?, ?, ?)\",\n", + " ((4, \"multiple\", datetime.now()), (5, \"parameter sets\", datetime.fromtimestamp(0))),\n", + ")" ] }, { @@ -117,6 +142,7 @@ "metadata": {}, "outputs": [], "source": [ + "cursor.execute(\"select * from test_table\")\n", "print(\"Description: \", cursor.description)\n", "print(\"Rowcount: \", cursor.rowcount)" ] @@ -141,6 +167,67 @@ "print(cursor.fetchall())" ] }, + { + "cell_type": "markdown", + "id": "efc4ff0a", + "metadata": {}, + "source": [ + "## Multi-statement queries" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "744817b1", + "metadata": {}, + "outputs": [], + "source": [ + "cursor.execute(\n", + " \"\"\"\n", + " select * from test_table where id < 4;\n", + " select * from test_table where id > 2;\n", + "\"\"\"\n", + ")\n", + "print(cursor._row_sets[0][2])\n", + "print(cursor._row_sets[1][2])\n", + "print(cursor._rows)\n", + "# print(\"First query: \", cursor.fetchall())\n", + "assert cursor.nextset()\n", + "print(cursor._rows)\n", + "# print(\"Secont query: \", cursor.fetchall())\n", + "assert cursor.nextset() is None" + ] + }, + { + "cell_type": "markdown", + "id": "02e5db2f", + "metadata": {}, + "source": [ + "### Error handling\n", + "If one query fails during the execution, all remaining queries are canceled.\n", + "However, you still can fetch results for successful queries" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "888500a9", + "metadata": {}, + "outputs": [], + "source": [ + "try:\n", + " cursor.execute(\n", + " \"\"\"\n", + " select * from test_table where id < 4;\n", + " select * from test_table where wrong_field > 2;\n", + " select * from test_table\n", + " \"\"\"\n", + " )\n", + "except:\n", + " pass\n", + "cursor.fetchall()" + ] + }, { "cell_type": "markdown", "id": "b1cd4ff2", @@ -286,6 +373,38 @@ "source": [ "await print_results(async_cursor)" ] + }, + { + "cell_type": "markdown", + "id": "da36dd3f", + "metadata": {}, + "source": [ + "### Closing connection" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "83fc1686", + "metadata": {}, + "outputs": [], + "source": [ + "# manually\n", + "connection.close()\n", + "\n", + "# using context manager\n", + "with connect(\n", + " engine_url=engine_url,\n", + " engine_name=engine_name,\n", + " database=database_name,\n", + " username=username,\n", + " password=password,\n", + " api_endpoint=api_endpoint,\n", + ") as conn:\n", + " # create cursors, perform database queries\n", + " pass\n", + "conn.closed" + ] } ], "metadata": { From 8e7d78ff487d59b2f719b6c6215d1d07fb370c37 Mon Sep 17 00:00:00 2001 From: Stepan Burlakov Date: Thu, 27 Jan 2022 13:50:24 +0200 Subject: [PATCH 3/6] address comments --- examples/dbapi.ipynb | 9 ++++----- src/firebolt/common/util.py | 6 +++++- 2 files changed, 9 insertions(+), 6 deletions(-) diff --git a/examples/dbapi.ipynb b/examples/dbapi.ipynb index 3503d028de0..5623145076c 100644 --- a/examples/dbapi.ipynb +++ b/examples/dbapi.ipynb @@ -37,15 +37,14 @@ "source": [ "# Only one of these two parameters should be specified\n", "engine_url = \"\"\n", - "engine_name = \"sburlakov_test_integration\"\n", "assert bool(engine_url) != bool(\n", " engine_name\n", "), \"Specify only one of engine_name and engine_url\"\n", "\n", - "database_name = \"sburlakov_test\"\n", - "username = \"stepan.burlakov@firebolt.io\"\n", - "password = \"Not55566\"\n", - "api_endpoint = \"api.dev.firebolt.io\" # DEFAULT_API_URL" + "database_name = \"\"\n", + "username = \"\"\n", + "password = \"\"\n", + "api_endpoint = DEFAULT_API_URL" ] }, { diff --git a/src/firebolt/common/util.py b/src/firebolt/common/util.py index 5d36c59aa37..9d7209f8378 100644 --- a/src/firebolt/common/util.py +++ b/src/firebolt/common/util.py @@ -52,11 +52,15 @@ def fix_url_schema(url: str) -> str: class AsyncJobThread: + """ + Thread runner that allows running async tasks syncronously in a separate thread. + Caches loop to be reused in all threads + """ + def __init__(self) -> None: self.loop: Optional[AbstractEventLoop] = None self.result: Optional[Any] = None self.exception: Optional[BaseException] = None - super().__init__() def _initialize_loop(self) -> None: if not self.loop: From c9bd28baa7310018cba91c13cb4e14cec6fe2e85 Mon Sep 17 00:00:00 2001 From: Stepan Burlakov Date: Thu, 27 Jan 2022 13:51:56 +0200 Subject: [PATCH 4/6] clean outputs --- examples/dbapi.ipynb | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/examples/dbapi.ipynb b/examples/dbapi.ipynb index 5623145076c..5f0dfad3622 100644 --- a/examples/dbapi.ipynb +++ b/examples/dbapi.ipynb @@ -10,7 +10,7 @@ }, { "cell_type": "code", - "execution_count": 1, + "execution_count": null, "id": "bdd3e404", "metadata": {}, "outputs": [], @@ -30,7 +30,7 @@ }, { "cell_type": "code", - "execution_count": 2, + "execution_count": null, "id": "0ce8b2d3", "metadata": {}, "outputs": [], @@ -57,7 +57,7 @@ }, { "cell_type": "code", - "execution_count": 3, + "execution_count": null, "id": "646869f7", "metadata": {}, "outputs": [], From 45500734b6fd2e662efda432dd15d32327542c88 Mon Sep 17 00:00:00 2001 From: Stepan Burlakov Date: Fri, 28 Jan 2022 12:45:43 +0200 Subject: [PATCH 5/6] extend comments --- src/firebolt/common/util.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/src/firebolt/common/util.py b/src/firebolt/common/util.py index 9d7209f8378..3c401c6d26b 100644 --- a/src/firebolt/common/util.py +++ b/src/firebolt/common/util.py @@ -55,6 +55,8 @@ class AsyncJobThread: """ Thread runner that allows running async tasks syncronously in a separate thread. Caches loop to be reused in all threads + It allows running async functions syncronously inside a running event loop. + Since nesting loops is not allowed, we create a separate thread for a new event loop """ def __init__(self) -> None: From caff871c3a629a5011e2c34a04787d74089fd0c2 Mon Sep 17 00:00:00 2001 From: Stepan Burlakov Date: Fri, 28 Jan 2022 13:20:02 +0200 Subject: [PATCH 6/6] extend comments 2 --- src/firebolt/common/util.py | 1 + 1 file changed, 1 insertion(+) diff --git a/src/firebolt/common/util.py b/src/firebolt/common/util.py index 3c401c6d26b..fed7da188dc 100644 --- a/src/firebolt/common/util.py +++ b/src/firebolt/common/util.py @@ -67,6 +67,7 @@ def __init__(self) -> None: def _initialize_loop(self) -> None: if not self.loop: try: + # despite the docs, this function fails if no loop is set self.loop = get_event_loop() except RuntimeError: self.loop = new_event_loop()