Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion src/firebolt/common/base_cursor.py
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,6 @@ class BaseCursor:
"_rowcount",
"_rows",
"_idx",
"_idx_lock",
"_row_sets",
"_next_set_idx",
"_set_parameters",
Expand Down
8 changes: 6 additions & 2 deletions src/firebolt/db/connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,9 @@ def __init__(

def cursor(self, **kwargs: Any) -> Cursor:
if self.closed:
raise ConnectionClosedError("Unable to create cursor: connection closed.")
raise ConnectionClosedError(
"Unable to create cursor: connection closed." # pragma: no mutate
)

with self._closing_lock.gen_rlock():
c = Cursor(client=self._client, connection=self, **kwargs)
Expand Down Expand Up @@ -168,7 +170,9 @@ def close(self) -> None:
# Context manager support
def __enter__(self) -> Connection:
if self.closed:
raise ConnectionClosedError("Connection is already closed.")
raise ConnectionClosedError(
"Connection is already closed." # pragma: no mutate
)
return self

def __exit__(
Expand Down
11 changes: 1 addition & 10 deletions src/firebolt/db/cursor.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
import logging
import re
import time
from threading import Lock
from typing import (
TYPE_CHECKING,
Any,
Expand All @@ -16,7 +15,6 @@
)

from httpx import Response, codes
from readerwriterlock.rwlock import RWLockWrite

from firebolt.client import Client
from firebolt.common._types import (
Expand Down Expand Up @@ -65,17 +63,10 @@ class Cursor(BaseCursor):
with the :py:func:`fetchmany` method
"""

__slots__ = BaseCursor.__slots__ + (
"_query_lock",
"_idx_lock",
)

def __init__(
self, *args: Any, client: Client, connection: Connection, **kwargs: Any
) -> None:
super().__init__(*args, **kwargs)
self._query_lock = RWLockWrite()
self._idx_lock = Lock()
self._client = client
self.connection = connection

Expand All @@ -100,7 +91,7 @@ def _raise_if_error(self, resp: Response) -> None:
if not is_engine_running(self.connection, self.connection.engine_url):
raise EngineNotRunningError(
f"Firebolt engine {self.connection.engine_url} "
"needs to be running to run queries against it."
"needs to be running to run queries against it." # pragma: no mutate # noqa: E501
)
resp.raise_for_status()

Expand Down
25 changes: 24 additions & 1 deletion tests/unit/async_db/test_connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,13 @@
from firebolt.utils.token_storage import TokenSecureStorage


@mark.skip("__slots__ is broken on Connection class")
async def test_connection_attributes(connection: Connection) -> None:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please add a comment on why do we need this test

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I didn't have a chance to properly test this one so it looks odd. Reworked it and marked to skip since slots has no effect in Connection class. :(

"""Test that no unexpected values can be set. Governed by __slots__"""
with raises(AttributeError):
connection.not_a_database = "dummy"


async def test_closed_connection(connection: Connection) -> None:
"""Connection methods are unavailable for closed connection."""
await connection.aclose()
Expand All @@ -34,13 +41,14 @@ async def test_closed_connection(connection: Connection) -> None:

async def test_cursors_closed_on_close(connection: Connection) -> None:
"""Connection closes all its cursors on close."""
assert connection.closed == False, "Initial state of connection is incorrect"
c1, c2 = connection.cursor(), connection.cursor()
assert (
len(connection._cursors) == 2
), "Invalid number of cursors stored in connection."

await connection.aclose()
assert connection.closed, "Connection was not closed on close."
assert connection.closed == True, "Connection was not closed on close."
assert c1.closed, "Cursor was not closed on connection close."
assert c2.closed, "Cursor was not closed on connection close."
assert len(connection._cursors) == 0, "Cursors left in connection after close."
Expand Down Expand Up @@ -197,6 +205,21 @@ async def test_connection_token_caching(
mock_connection_flow()
mock_query()

# Using caching by default
with Patcher():
async with await connect(
database=db_name,
auth=ClientCredentials(client_id, client_secret, use_token_cache=True),
engine_name=engine_name,
account_name=account_name,
api_endpoint=server,
) as connection:
assert await connection.cursor().execute("select*") == len(
python_query_data
)
ts = TokenSecureStorage(username=client_id, password=client_secret)
assert ts.get_cached_token() == access_token, "Invalid token value cached"

with Patcher():
async with await connect(
database=db_name,
Expand Down
131 changes: 123 additions & 8 deletions tests/unit/async_db/test_cursor.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from typing import Callable, Dict, List
from typing import Any, Callable, Dict, List
from unittest.mock import patch

from httpx import HTTPStatusError, StreamError, codes
Expand All @@ -8,12 +8,16 @@
from firebolt.async_db import Cursor
from firebolt.common._types import Column
from firebolt.common.base_cursor import ColType, CursorState, QueryStatus
from firebolt.common.settings import Settings
from firebolt.utils.exception import (
AsyncExecutionUnavailableError,
CursorClosedError,
DataError,
EngineNotRunningError,
FireboltDatabaseError,
FireboltEngineError,
OperationalError,
ProgrammingError,
QueryNotRunError,
)
from tests.unit.db_conftest import encode_param
Expand Down Expand Up @@ -189,8 +193,11 @@ async def test_cursor_execute(
async def test_cursor_execute_error(
httpx_mock: HTTPXMock,
query_url: str,
get_engines_url: str,
settings: Settings,
db_name: str,
query_statistics: Dict[str, Any],
cursor: Cursor,
get_engine_url_not_running_callback: Callable,
system_engine_query_url: str,
):
"""Cursor handles all types of errors properly."""
Expand Down Expand Up @@ -240,19 +247,95 @@ def http_error(*args, **kwargs):
str(excinfo.value) == "Error executing query:\nQuery error message"
), f"Invalid authentication error message for {message}."

# Database does not exist error
httpx_mock.add_response(
status_code=codes.FORBIDDEN,
content="Query error message",
url=query_url,
match_content=b"select * from t",
)
httpx_mock.add_response(
url=system_engine_query_url,
method="POST",
json={
"rows": "0",
"data": [],
"meta": [],
"statistics": query_statistics,
},
)
with raises(FireboltDatabaseError) as excinfo:
await query()
assert cursor._state == CursorState.ERROR
assert db_name in str(excinfo)

# Database exists but some other error
error_message = "My query error message"
httpx_mock.add_response(
status_code=codes.FORBIDDEN,
content=error_message,
url=query_url,
match_content=b"select * from t",
)
httpx_mock.add_response(
url=system_engine_query_url,
method="POST",
json={
"rows": "1",
"data": ["my_db"],
"meta": [],
"statistics": query_statistics,
},
)
with raises(ProgrammingError) as excinfo:
await query()
assert cursor._state == CursorState.ERROR
assert error_message in str(excinfo)

# Engine is not running error
httpx_mock.add_response(
status_code=codes.SERVICE_UNAVAILABLE,
content="Query error message",
url=query_url,
)
httpx_mock.add_callback(
get_engine_url_not_running_callback,
httpx_mock.add_response(
url=system_engine_query_url,
method="POST",
json={
"rows": "1",
"data": [[get_engines_url, "my_db", "Stopped"]],
"meta": [
{"name": "url", "type": "text"},
{"name": "attached_to", "type": "text"},
{"name": "status", "type": "text"},
],
"statistics": query_statistics,
},
)
with raises(EngineNotRunningError) as excinfo:
await query()
assert cursor._state == CursorState.ERROR
assert settings.server in str(excinfo)

# Engine does not exist
httpx_mock.add_response(
status_code=codes.SERVICE_UNAVAILABLE,
content="Query error message",
url=query_url,
)
httpx_mock.add_response(
url=system_engine_query_url,
method="POST",
json={
"rows": "0",
"data": [],
"meta": [],
"statistics": query_statistics,
},
)
with raises(FireboltEngineError) as excinfo:
await query()
assert cursor._state == CursorState.ERROR

httpx_mock.reset(True)

Expand Down Expand Up @@ -487,11 +570,11 @@ async def test_cursor_multi_statement(
for i, (desc, exp) in enumerate(zip(cursor.description, python_query_description)):
assert desc == exp, f"Invalid column description at position {i}"

assert cursor.statistics.elapsed == 0.002983335
assert cursor.statistics.time_before_execution == 0.002729331
assert cursor.statistics.time_to_execute == 0.000215215
assert cursor.statistics.elapsed == 0.116907717
assert cursor.statistics.time_before_execution == 0.012180623
assert cursor.statistics.time_to_execute == 0.104614307
assert cursor.statistics.rows_read == 1
assert cursor.statistics.bytes_read == 1
assert cursor.statistics.bytes_read == 61
assert cursor.statistics.scanned_bytes_cache == 0
assert cursor.statistics.scanned_bytes_storage == 0

Expand Down Expand Up @@ -664,7 +747,11 @@ async def test_cursor_server_side_async_cancel(
httpx_mock.add_callback(
server_side_async_cancel_callback, url=query_with_params_url
)
cursor._set_parameters = {"invalid_parameter": "should_not_be_present"}
await cursor.cancel(server_side_async_id)
cursor.close()
with raises(CursorClosedError):
await cursor.cancel(server_side_async_id)


async def test_cursor_server_side_async_get_status_completed(
Expand Down Expand Up @@ -727,3 +814,31 @@ async def test_cursor_server_side_async_get_status_error(
str(excinfo.value)
== f"Asynchronous query {server_side_async_id} status check failed."
), f"Invalid get_status error message."


async def test_cursor_iterate(
httpx_mock: HTTPXMock,
query_callback: Callable,
query_url: str,
cursor: Cursor,
python_query_data: List[List[ColType]],
):
"""Cursor is able to execute query, all fields are populated properly."""

httpx_mock.add_callback(query_callback, url=query_url)

with raises(QueryNotRunError):
async for res in cursor:
pass

await cursor.execute("select * from t")
i = 0
async for res in cursor:
assert res in python_query_data
i += 1
assert i == len(python_query_data), "Wrong number iterations of a cursor were done"

cursor.close()
with raises(CursorClosedError):
async for res in cursor:
pass
18 changes: 18 additions & 0 deletions tests/unit/db/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,24 @@ def connection(
yield connection


@fixture
def system_connection(
server: str,
db_name: str,
auth: Auth,
account_name: str,
mock_system_connection_flow: Callable,
) -> Connection:
mock_system_connection_flow()
with connect(
database=db_name,
auth=auth,
account_name=account_name,
api_endpoint=server,
) as connection:
yield connection


@fixture
def cursor(connection: Connection) -> Cursor:
return connection.cursor()
Loading