diff --git a/src/firebolt/async_db/connection.py b/src/firebolt/async_db/connection.py index b659f533ca7..9953497dc3e 100644 --- a/src/firebolt/async_db/connection.py +++ b/src/firebolt/async_db/connection.py @@ -238,8 +238,8 @@ async def connect( if database is not None and database != attached_db: raise InterfaceError( - f"Engine {engine_name} is not attached to {database}, " - f"but to {attached_db}" + f"Engine {engine_name} is attached to {attached_db} " + f"instead of {database}" ) elif database is None: database = attached_db diff --git a/src/firebolt/async_db/cursor.py b/src/firebolt/async_db/cursor.py index 0ded9783f2c..7d2d5e25a33 100644 --- a/src/firebolt/async_db/cursor.py +++ b/src/firebolt/async_db/cursor.py @@ -111,7 +111,7 @@ async def _raise_if_error(self, resp: Response) -> None: async def _api_request( self, query: str = "", - parameters: dict[str, Any] = {}, + parameters: Optional[dict[str, Any]] = None, path: str = "", use_set_parameters: bool = True, ) -> Response: @@ -130,8 +130,9 @@ async def _api_request( set parameters are sent. Setting this to False will allow self._set_parameters to be ignored. """ + parameters = parameters or {} if use_set_parameters: - parameters = {**(self._set_parameters or {}), **(parameters or {})} + parameters = {**(self._set_parameters or {}), **parameters} if self.connection.database: parameters["database"] = self.connection.database if self.connection._is_system: diff --git a/src/firebolt/common/base_cursor.py b/src/firebolt/common/base_cursor.py index 9b78145cf11..05fbb4ef738 100644 --- a/src/firebolt/common/base_cursor.py +++ b/src/firebolt/common/base_cursor.py @@ -337,6 +337,10 @@ def _get_next_range(self, size: int) -> Tuple[int, int]: self._idx = right return left, right + _performance_log_message = ( + "[PERFORMANCE] Parsing query output into native Python types" + ) + @check_not_closed @check_query_executed def fetchone(self) -> Optional[List[ColType]]: @@ -346,7 +350,7 @@ def fetchone(self) -> Optional[List[ColType]]: # We are out of elements return None assert self._rows is not None - with Timer("[PERFORMANCE] Parsing query output into native Python types "): + with Timer(self._performance_log_message): result = self._parse_row(self._rows[left]) return result @@ -361,7 +365,7 @@ def fetchmany(self, size: Optional[int] = None) -> List[List[ColType]]: left, right = self._get_next_range(size) assert self._rows is not None rows = self._rows[left:right] - with Timer("[PERFORMANCE] Parsing query output into native Python types "): + with Timer(self._performance_log_message): result = [self._parse_row(row) for row in rows] return result @@ -372,7 +376,7 @@ def fetchall(self) -> List[List[ColType]]: left, right = self._get_next_range(self.rowcount) assert self._rows is not None rows = self._rows[left:right] - with Timer("[PERFORMANCE] Parsing query output into native Python types "): + with Timer(self._performance_log_message): result = [self._parse_row(row) for row in rows] return result diff --git a/src/firebolt/db/connection.py b/src/firebolt/db/connection.py index 5cf64f6d410..453959127da 100644 --- a/src/firebolt/db/connection.py +++ b/src/firebolt/db/connection.py @@ -247,8 +247,8 @@ def connect( if database is not None and database != attached_db: raise InterfaceError( - f"Engine {engine_name} is not attached to {database}, " - f"but to {attached_db}" + f"Engine {engine_name} is attached to {attached_db} " + f"instead of {database}" ) elif database is None: database = attached_db diff --git a/src/firebolt/db/cursor.py b/src/firebolt/db/cursor.py index 87e3ff960d8..4312a6960dc 100644 --- a/src/firebolt/db/cursor.py +++ b/src/firebolt/db/cursor.py @@ -98,7 +98,7 @@ def _raise_if_error(self, resp: Response) -> None: def _api_request( self, query: str = "", - parameters: dict[str, Any] = {}, + parameters: Optional[dict[str, Any]] = None, path: str = "", use_set_parameters: bool = True, ) -> Response: @@ -117,8 +117,9 @@ def _api_request( set parameters are sent. Setting this to False will allow self._set_parameters to be ignored. """ + parameters = parameters or {} if use_set_parameters: - parameters = {**(self._set_parameters or {}), **(parameters or {})} + parameters = {**(self._set_parameters or {}), **parameters} if self.connection.database: parameters["database"] = self.connection.database if self.connection._is_system: diff --git a/tests/integration/dbapi/async/test_errors_async.py b/tests/integration/dbapi/async/test_errors_async.py index 127fd327532..6a569f6683a 100644 --- a/tests/integration/dbapi/async/test_errors_async.py +++ b/tests/integration/dbapi/async/test_errors_async.py @@ -92,7 +92,8 @@ async def test_database_not_exists( await connection.cursor().execute("show tables") assert ( - str(exc_info.value) == f"Database {new_db_name} does not exist" + str(exc_info.value) + == f"Engine {engine_name} is attached to {database_name} instead of {new_db_name}" ), "Invalid database name error message." diff --git a/tests/integration/dbapi/async/test_queries_async.py b/tests/integration/dbapi/async/test_queries_async.py index 4a961cddd2c..a17da52fdbc 100644 --- a/tests/integration/dbapi/async/test_queries_async.py +++ b/tests/integration/dbapi/async/test_queries_async.py @@ -127,10 +127,10 @@ async def test_long_query( async def test_drop_create(connection: Connection) -> None: """Create and drop table/index queries are handled properly.""" - async def test_query(c: Cursor, query: str, empty_response=True) -> None: + async def test_query(c: Cursor, query: str) -> None: await c.execute(query) assert c.description == None - assert c.rowcount == (-1 if empty_response else 0) + assert c.rowcount == 0 """Create table query is handled properly""" with connection.cursor() as c: @@ -166,7 +166,6 @@ async def test_query(c: Cursor, query: str, empty_response=True) -> None: c, "CREATE AGGREGATING INDEX test_db_agg_idx ON " "test_drop_create_async(id, sum(f), count(dt))", - empty_response=False, ) # Drop join index diff --git a/tests/integration/dbapi/async/test_system_engine.py b/tests/integration/dbapi/async/test_system_engine.py deleted file mode 100644 index f04f27d04c3..00000000000 --- a/tests/integration/dbapi/async/test_system_engine.py +++ /dev/null @@ -1,213 +0,0 @@ -from typing import List - -from pytest import fixture, mark, raises - -from firebolt.async_db import Connection -from firebolt.common._types import ColType, Column -from firebolt.utils.exception import OperationalError -from tests.integration.dbapi.utils import assert_deep_eq - - -@fixture -def db_name(database_name): - return database_name + "_system_test" - - -@fixture -def second_db_name(database_name): - return database_name + "_system_test_two" - - -@fixture -def region(): - return "us-east-1" - - -@fixture -def engine_name(engine_name): - return engine_name + "_system_test" - - -@fixture -async def setup_dbs( - connection_system_engine, db_name, second_db_name, engine_name, region -): - with connection_system_engine.cursor() as cursor: - - await cursor.execute(f"DROP DATABASE IF EXISTS {db_name}") - await cursor.execute(f"DROP DATABASE IF EXISTS {second_db_name}") - - await cursor.execute(create_database(name=db_name)) - - await cursor.execute(create_engine(engine_name, engine_specs(region))) - - await cursor.execute( - create_database(name=second_db_name, specs=db_specs(region, engine_name)) - ) - - yield - - await cursor.execute(f"DROP ENGINE IF EXISTS {engine_name}") - await cursor.execute(f"DROP DATABASE IF EXISTS {db_name}") - await cursor.execute(f"DROP DATABASE IF EXISTS {second_db_name}") - - -async def test_system_engine( - connection_system_engine: Connection, - all_types_query: str, - all_types_query_description: List[Column], - all_types_query_system_engine_response: List[ColType], - timezone_name: str, -) -> None: - """Connecting with engine name is handled properly.""" - with connection_system_engine.cursor() as c: - assert await c.execute(all_types_query) == 1, "Invalid row count returned" - assert c.rowcount == 1, "Invalid rowcount value" - data = await c.fetchall() - assert len(data) == c.rowcount, "Invalid data length" - assert_deep_eq(data, all_types_query_system_engine_response, "Invalid data") - assert c.description == all_types_query_description, "Invalid description value" - assert len(data[0]) == len(c.description), "Invalid description length" - assert len(await c.fetchall()) == 0, "Redundant data returned by fetchall" - - # Different fetch types - await c.execute(all_types_query) - assert ( - await c.fetchone() == all_types_query_system_engine_response[0] - ), "Invalid fetchone data" - assert await c.fetchone() is None, "Redundant data returned by fetchone" - - await c.execute(all_types_query) - assert len(await c.fetchmany(0)) == 0, "Invalid data size returned by fetchmany" - data = await c.fetchmany() - assert len(data) == 1, "Invalid data size returned by fetchmany" - assert_deep_eq( - data, - all_types_query_system_engine_response, - "Invalid data returned by fetchmany", - ) - - if connection_system_engine.database: - await c.execute("show tables") - with raises(OperationalError): - await c.execute("create table test(id int) primary index id") - else: - await c.execute("show databases") - with raises(OperationalError): - await c.execute("show tables") - - -async def test_system_engine_no_db( - connection_system_engine_no_db: Connection, - all_types_query: str, - all_types_query_description: List[Column], - all_types_query_system_engine_response: List[ColType], - timezone_name: str, -) -> None: - """Connecting with engine name is handled properly.""" - await test_system_engine( - connection_system_engine_no_db, - all_types_query, - all_types_query_description, - all_types_query_system_engine_response, - timezone_name, - ) - - -def engine_specs(region): - return f"REGION = '{region}' " "SPEC = 'B1' " "SCALE = 1" - - -def create_database(name, specs=None): - query = f"CREATE DATABASE {name}" - query += f" WITH {specs}" if specs else "" - return query - - -def create_engine(name, specs=None): - query = f"CREATE ENGINE {name}" - query += f" WITH {specs}" if specs else "" - return query - - -def db_specs(region, attached_engine): - return ( - f"REGION = '{region}' " - f"ATTACHED_ENGINES = ('{attached_engine}') " - "DESCRIPTION = 'Sample description'" - ) - - -@mark.parametrize( - "query", - ["CREATE DIMENSION TABLE dummy(id INT)"], -) -async def test_query_errors(connection_system_engine, query): - with connection_system_engine.cursor() as cursor: - with raises(OperationalError): - await cursor.execute(query) - - -@mark.xdist_group(name="system_engine") -async def test_show_databases(setup_dbs, connection_system_engine, db_name): - with connection_system_engine.cursor() as cursor: - - await cursor.execute("SHOW DATABASES") - - dbs = [row[0] for row in await cursor.fetchall()] - - assert db_name in dbs - assert f"{db_name}_two" in dbs - - -@mark.xdist_group(name="system_engine") -async def test_detach_engine( - setup_dbs, connection_system_engine, engine_name, second_db_name -): - async def check_engine_exists(cursor, engine_name, db_name): - await cursor.execute("SHOW ENGINES") - engines = await cursor.fetchall() - # Results have the following columns - # engine_name, region, spec, scale, status, attached_to, version - assert engine_name in [row[0] for row in engines] - assert (engine_name, db_name) in [(row[0], row[5]) for row in engines] - - with connection_system_engine.cursor() as cursor: - await check_engine_exists(cursor, engine_name, db_name=second_db_name) - await cursor.execute(f"DETACH ENGINE {engine_name} FROM {second_db_name}") - - # When engine not attached db is - - await check_engine_exists(cursor, engine_name, db_name="-") - - await cursor.execute(f"ATTACH ENGINE {engine_name} TO {second_db_name}") - await check_engine_exists(cursor, engine_name, db_name=second_db_name) - - -@mark.xdist_group(name="system_engine") -async def test_alter_engine(setup_dbs, connection_system_engine, engine_name): - with connection_system_engine.cursor() as cursor: - await cursor.execute(f"ALTER ENGINE {engine_name} SET AUTO_STOP = 60") - - await cursor.execute( - "SELECT engine_name, auto_stop FROM information_schema.engines" - ) - engines = await cursor.fetchall() - assert [engine_name, 3600] in engines - - -@mark.xdist_group(name="system_engine") -async def test_start_stop_engine(setup_dbs, connection_system_engine, engine_name): - async def check_engine_status(cursor, engine_name, status): - await cursor.execute("SHOW ENGINES") - engines = await cursor.fetchall() - # Results have the following columns - # engine_name, region, spec, scale, status, attached_to, version - assert engine_name in [row[0] for row in engines] - assert (engine_name, status) in [(row[0], row[4]) for row in engines] - - with connection_system_engine.cursor() as cursor: - await check_engine_status(cursor, engine_name, "Stopped") - await cursor.execute(f"START ENGINE {engine_name}") - await check_engine_status(cursor, engine_name, "Running") - await cursor.execute(f"STOP ENGINE {engine_name}") - await check_engine_status(cursor, engine_name, "Stopped") diff --git a/tests/integration/dbapi/async/test_system_engine_async.py b/tests/integration/dbapi/async/test_system_engine_async.py new file mode 100644 index 00000000000..617503ad65a --- /dev/null +++ b/tests/integration/dbapi/async/test_system_engine_async.py @@ -0,0 +1,70 @@ +from typing import List + +from pytest import raises + +from firebolt.async_db import Connection +from firebolt.common._types import ColType, Column +from firebolt.utils.exception import OperationalError +from tests.integration.dbapi.utils import assert_deep_eq + + +async def test_system_engine( + connection_system_engine: Connection, + all_types_query: str, + all_types_query_description: List[Column], + all_types_query_system_engine_response: List[ColType], + timezone_name: str, +) -> None: + """Connecting with engine name is handled properly.""" + with connection_system_engine.cursor() as c: + assert await c.execute(all_types_query) == 1, "Invalid row count returned" + assert c.rowcount == 1, "Invalid rowcount value" + data = await c.fetchall() + assert len(data) == c.rowcount, "Invalid data length" + assert_deep_eq(data, all_types_query_system_engine_response, "Invalid data") + assert c.description == all_types_query_description, "Invalid description value" + assert len(data[0]) == len(c.description), "Invalid description length" + assert len(await c.fetchall()) == 0, "Redundant data returned by fetchall" + + # Different fetch types + await c.execute(all_types_query) + assert ( + await c.fetchone() == all_types_query_system_engine_response[0] + ), "Invalid fetchone data" + assert await c.fetchone() is None, "Redundant data returned by fetchone" + + await c.execute(all_types_query) + assert len(await c.fetchmany(0)) == 0, "Invalid data size returned by fetchmany" + data = await c.fetchmany() + assert len(data) == 1, "Invalid data size returned by fetchmany" + assert_deep_eq( + data, + all_types_query_system_engine_response, + "Invalid data returned by fetchmany", + ) + + if connection_system_engine.database: + await c.execute("show tables") + with raises(OperationalError): + await c.execute("create table test(id int) primary index id") + else: + await c.execute("show databases") + with raises(OperationalError): + await c.execute("show tables") + + +async def test_system_engine_no_db( + connection_system_engine_no_db: Connection, + all_types_query: str, + all_types_query_description: List[Column], + all_types_query_system_engine_response: List[ColType], + timezone_name: str, +) -> None: + """Connecting with engine name is handled properly.""" + await test_system_engine( + connection_system_engine_no_db, + all_types_query, + all_types_query_description, + all_types_query_system_engine_response, + timezone_name, + ) diff --git a/tests/integration/dbapi/conftest.py b/tests/integration/dbapi/conftest.py index 7495af88b94..9abc9336369 100644 --- a/tests/integration/dbapi/conftest.py +++ b/tests/integration/dbapi/conftest.py @@ -118,7 +118,7 @@ def all_types_query_response(timezone_offset_seconds: int) -> List[ColType]: 30000000000, -30000000000, 1.23, - 1.2345678901234, + 1.23456789012, "text", date(2021, 3, 28), date(1, 1, 1), diff --git a/tests/integration/dbapi/sync/test_errors.py b/tests/integration/dbapi/sync/test_errors.py index 088041d9833..aab26216492 100644 --- a/tests/integration/dbapi/sync/test_errors.py +++ b/tests/integration/dbapi/sync/test_errors.py @@ -93,7 +93,8 @@ def test_database_not_exists( connection.cursor().execute("show tables") assert ( - str(exc_info.value) == f"Database {new_db_name} does not exist" + str(exc_info.value) + == f"Engine {engine_name} is attached to {database_name} instead of {new_db_name}" ), "Invalid database name error message" diff --git a/tests/integration/dbapi/sync/test_queries.py b/tests/integration/dbapi/sync/test_queries.py index 32c7320fb16..3a034c628e6 100644 --- a/tests/integration/dbapi/sync/test_queries.py +++ b/tests/integration/dbapi/sync/test_queries.py @@ -133,10 +133,10 @@ def test_long_query( def test_drop_create(connection: Connection) -> None: """Create and drop table/index queries are handled properly.""" - def test_query(c: Cursor, query: str, empty_response=True) -> None: + def test_query(c: Cursor, query: str) -> None: c.execute(query) assert c.description == None - assert c.rowcount == (-1 if empty_response else 0) + assert c.rowcount == 0 """Create table query is handled properly""" with connection.cursor() as c: @@ -172,7 +172,6 @@ def test_query(c: Cursor, query: str, empty_response=True) -> None: c, "CREATE AGGREGATING INDEX test_drop_create_db_agg_idx ON " "test_drop_create_tb(id, sum(f), count(dt))", - empty_response=False, ) # Drop join index diff --git a/tests/integration/dbapi/sync/test_system_engine.py b/tests/integration/dbapi/sync/test_system_engine.py index 58b4ec10418..04ca5c9e277 100644 --- a/tests/integration/dbapi/sync/test_system_engine.py +++ b/tests/integration/dbapi/sync/test_system_engine.py @@ -1,6 +1,6 @@ from typing import List -from pytest import fixture, mark, raises +from pytest import raises from firebolt.common._types import ColType, Column from firebolt.db import Connection @@ -8,45 +8,6 @@ from tests.integration.dbapi.utils import assert_deep_eq -@fixture -def db_name(database_name): - return database_name + "_system_test" - - -@fixture -def second_db_name(database_name): - return database_name + "_system_test_two" - - -@fixture -def region(): - return "us-east-1" - - -@fixture -def engine_name(engine_name): - return engine_name + "_system_test" - - -@fixture -def setup_dbs(connection_system_engine, db_name, second_db_name, engine_name, region): - with connection_system_engine.cursor() as cursor: - - cursor.execute(create_database(name=db_name)) - - cursor.execute(create_engine(engine_name, engine_specs(region))) - - cursor.execute( - create_database(name=second_db_name, specs=db_specs(region, engine_name)) - ) - - yield - - cursor.execute(f"DROP ENGINE IF EXISTS {engine_name}") - cursor.execute(f"DROP DATABASE IF EXISTS {db_name}") - cursor.execute(f"DROP DATABASE IF EXISTS {second_db_name}") - - def test_system_engine( connection_system_engine: Connection, all_types_query: str, @@ -107,100 +68,3 @@ def test_system_engine_no_db( all_types_query_system_engine_response, timezone_name, ) - - -def engine_specs(region): - return f"REGION = '{region}' " "SPEC = 'B1' " "SCALE = 1" - - -def create_database(name, specs=None): - query = f"CREATE DATABASE {name}" - query += f" WITH {specs}" if specs else "" - return query - - -def create_engine(name, specs=None): - query = f"CREATE ENGINE {name}" - query += f" WITH {specs}" if specs else "" - return query - - -def db_specs(region, attached_engine): - return ( - f"REGION = '{region}' " - f"ATTACHED_ENGINES = ('{attached_engine}') " - "DESCRIPTION = 'Sample description'" - ) - - -@mark.parametrize( - "query", - ["CREATE DIMENSION TABLE dummy(id INT)"], -) -def test_query_errors(connection_system_engine, query): - with connection_system_engine.cursor() as cursor: - with raises(OperationalError): - cursor.execute(query) - - -@mark.xdist_group(name="system_engine") -def test_show_databases(setup_dbs, connection_system_engine, db_name): - with connection_system_engine.cursor() as cursor: - - cursor.execute("SHOW DATABASES") - - dbs = [row[0] for row in cursor.fetchall()] - - assert db_name in dbs - assert f"{db_name}_two" in dbs - - -@mark.xdist_group(name="system_engine") -def test_detach_engine( - setup_dbs, connection_system_engine, engine_name, second_db_name -): - def check_engine_exists(cursor, engine_name, db_name): - cursor.execute("SHOW ENGINES") - engines = cursor.fetchall() - # Results have the following columns - # engine_name, region, spec, scale, status, attached_to, version - assert engine_name in [row[0] for row in engines] - assert (engine_name, db_name) in [(row[0], row[5]) for row in engines] - - with connection_system_engine.cursor() as cursor: - check_engine_exists(cursor, engine_name, db_name=second_db_name) - cursor.execute(f"DETACH ENGINE {engine_name} FROM {second_db_name}") - - # When engine not attached db is - - check_engine_exists(cursor, engine_name, db_name="-") - - cursor.execute(f"ATTACH ENGINE {engine_name} TO {second_db_name}") - check_engine_exists(cursor, engine_name, db_name=second_db_name) - - -@mark.xdist_group(name="system_engine") -def test_alter_engine(setup_dbs, connection_system_engine, engine_name): - with connection_system_engine.cursor() as cursor: - cursor.execute(f"ALTER ENGINE {engine_name} SET AUTO_STOP = 60") - - cursor.execute("SELECT engine_name, auto_stop FROM information_schema.engines") - engines = cursor.fetchall() - assert [engine_name, 3600] in engines - - -@mark.xdist_group(name="system_engine") -def test_start_stop_engine(setup_dbs, connection_system_engine, engine_name): - def check_engine_status(cursor, engine_name, status): - cursor.execute("SHOW ENGINES") - engines = cursor.fetchall() - # Results have the following columns - # engine_name, region, spec, scale, status, attached_to, version - assert engine_name in [row[0] for row in engines] - assert (engine_name, status) in [(row[0], row[4]) for row in engines] - - with connection_system_engine.cursor() as cursor: - check_engine_status(cursor, engine_name, "Stopped") - cursor.execute(f"START ENGINE {engine_name}") - check_engine_status(cursor, engine_name, "Running") - cursor.execute(f"STOP ENGINE {engine_name}") - check_engine_status(cursor, engine_name, "Stopped")