diff --git a/.github/workflows/integration-tests.yml b/.github/workflows/integration-tests.yml index 5509b337503..a77ec93bdd5 100644 --- a/.github/workflows/integration-tests.yml +++ b/.github/workflows/integration-tests.yml @@ -45,5 +45,5 @@ jobs: API_ENDPOINT: "api.dev.firebolt.io" ACCOUNT_NAME: "firebolt" run: | - pytest -o log_cli=true -o log_cli_level=INFO tests/integration + pytest -n 6 -o log_cli=true -o log_cli_level=INFO tests/integration diff --git a/setup.cfg b/setup.cfg index 9a8b6d25aed..314d1e2f0b6 100755 --- a/setup.cfg +++ b/setup.cfg @@ -49,6 +49,8 @@ dev = pytest-cov==3.0.0 pytest-httpx==0.18.0 pytest-mock==3.6.1 + pytest-timeout==2.1.0 + pytest-xdist==2.5.0 [options.package_data] firebolt = py.typed diff --git a/tests/integration/dbapi/async/test_queries_async.py b/tests/integration/dbapi/async/test_queries_async.py index 1a1d39a0b69..22853fbdeb2 100644 --- a/tests/integration/dbapi/async/test_queries_async.py +++ b/tests/integration/dbapi/async/test_queries_async.py @@ -64,7 +64,14 @@ async def test_select( data, all_types_query_response, "Invalid data returned by fetchmany" ) - # AWS ALB TCP timeout set to 350, make sure we handle the keepalive correctly + +@mark.asyncio +@mark.timeout(timeout=400, method="signal") +async def test_long_query( + connection: Connection, +) -> None: + """AWS ALB TCP timeout set to 350, make sure we handle the keepalive correctly""" + with connection.cursor() as c: await c.execute( "SELECT sleepEachRow(1) from numbers(360)", set_parameters={"advanced_mode": "1", "use_standard_sql": "0"}, @@ -92,35 +99,39 @@ async def test_query(c: Cursor, query: str) -> None: """Create table query is handled properly""" with connection.cursor() as c: # Cleanup - await c.execute("DROP JOIN INDEX IF EXISTS test_db_join_idx") - await c.execute("DROP AGGREGATING INDEX IF EXISTS test_db_agg_idx") - await c.execute("DROP TABLE IF EXISTS test_tb") - await c.execute("DROP TABLE IF EXISTS test_tb_dim") + await c.execute("DROP JOIN INDEX IF EXISTS test_drop_create_async_db_join_idx") + await c.execute( + "DROP AGGREGATING INDEX IF EXISTS test_drop_create_async_db_agg_idx" + ) + await c.execute("DROP TABLE IF EXISTS test_drop_create_async_tb") + await c.execute("DROP TABLE IF EXISTS test_drop_create_async_tb_dim") # Fact table await test_query( c, - "CREATE FACT TABLE test_tb(id int, sn string null, f float," + "CREATE FACT TABLE test_drop_create_async(id int, sn string null, f float," "d date, dt datetime, b bool, a array(int)) primary index id", ) # Dimension table await test_query( c, - "CREATE DIMENSION TABLE test_tb_dim(id int, sn string null, f float," - "d date, dt datetime, b bool, a array(int))", + "CREATE DIMENSION TABLE test_drop_create_async_dim(id int, sn string null" + ", f float, d date, dt datetime, b bool, a array(int))", ) # Create join index await test_query( - c, "CREATE JOIN INDEX test_db_join_idx ON test_tb_dim(id, sn, f)" + c, + "CREATE JOIN INDEX test_db_join_idx ON " + "test_drop_create_async_dim(id, sn, f)", ) # Create aggregating index await test_query( c, "CREATE AGGREGATING INDEX test_db_agg_idx ON " - "test_tb(id, sum(f), count(dt))", + "test_drop_create_async(id, sum(f), count(dt))", ) # Drop join index @@ -130,11 +141,11 @@ async def test_query(c: Cursor, query: str) -> None: await test_query(c, "DROP AGGREGATING INDEX test_db_agg_idx") # Test drop once again - await test_query(c, "DROP TABLE test_tb") - await test_query(c, "DROP TABLE IF EXISTS test_tb") + await test_query(c, "DROP TABLE test_drop_create_async") + await test_query(c, "DROP TABLE IF EXISTS test_drop_create_async") - await test_query(c, "DROP TABLE test_tb_dim") - await test_query(c, "DROP TABLE IF EXISTS test_tb_dim") + await test_query(c, "DROP TABLE test_drop_create_async_dim") + await test_query(c, "DROP TABLE IF EXISTS test_drop_create_async_dim") @mark.asyncio @@ -155,20 +166,23 @@ async def test_empty_query(c: Cursor, query: str) -> None: await c.fetchall() with connection.cursor() as c: - await c.execute("DROP TABLE IF EXISTS test_tb") + await c.execute("DROP TABLE IF EXISTS test_insert_async_tb") await c.execute( - "CREATE FACT TABLE test_tb(id int, sn string null, f float," + "CREATE FACT TABLE test_insert_async_tb(id int, sn string null, f float," "d date, dt datetime, b bool, a array(int)) primary index id" ) await test_empty_query( c, - "INSERT INTO test_tb VALUES (1, 'sn', 1.1, '2021-01-01'," + "INSERT INTO test_insert_async_tb VALUES (1, 'sn', 1.1, '2021-01-01'," "'2021-01-01 01:01:01', true, [1, 2, 3])", ) assert ( - await c.execute("SELECT * FROM test_tb ORDER BY test_tb.id") == 1 + await c.execute( + "SELECT * FROM test_insert_async_tb ORDER BY test_insert_async_tb.id" + ) + == 1 ), "Invalid data length in table after insert." assert_deep_eq( @@ -206,9 +220,9 @@ async def test_empty_query(c: Cursor, query: str, params: tuple) -> None: await c.fetchall() with connection.cursor() as c: - await c.execute("DROP TABLE IF EXISTS test_tb_parameterized") + await c.execute("DROP TABLE IF EXISTS test_tb_async_parameterized") await c.execute( - "CREATE FACT TABLE test_tb_parameterized(i int, f float, s string, sn" + "CREATE FACT TABLE test_tb_async_parameterized(i int, f float, s string, sn" " string null, d date, dt datetime, b bool, a array(int), ss string)" " primary index i" ) @@ -226,7 +240,8 @@ async def test_empty_query(c: Cursor, query: str, params: tuple) -> None: await test_empty_query( c, - "INSERT INTO test_tb_parameterized VALUES (?, ?, ?, ?, ?, ?, ?, ?, '\\?')", + "INSERT INTO test_tb_async_parameterized VALUES " + "(?, ?, ?, ?, ?, ?, ?, ?, '\\?')", params, ) @@ -237,7 +252,7 @@ async def test_empty_query(c: Cursor, query: str, params: tuple) -> None: params[6] = 1 assert ( - await c.execute("SELECT * FROM test_tb_parameterized") == 1 + await c.execute("SELECT * FROM test_tb_async_parameterized") == 1 ), "Invalid data length in table after parameterized insert" assert_deep_eq( @@ -252,16 +267,17 @@ async def test_multi_statement_query(connection: Connection) -> None: """Query parameters are handled properly""" with connection.cursor() as c: - await c.execute("DROP TABLE IF EXISTS test_tb_multi_statement") + await c.execute("DROP TABLE IF EXISTS test_tb_async_multi_statement") await c.execute( - "CREATE FACT TABLE test_tb_multi_statement(i int, s string) primary index i" + "CREATE FACT TABLE test_tb_async_multi_statement(i int, s string)" + " primary index i" ) assert ( await c.execute( - "INSERT INTO test_tb_multi_statement values (1, 'a'), (2, 'b');" - "SELECT * FROM test_tb_multi_statement;" - "SELECT * FROM test_tb_multi_statement WHERE i <= 1" + "INSERT INTO test_tb_async_multi_statement values (1, 'a'), (2, 'b');" + "SELECT * FROM test_tb_async_multi_statement" + "SELECT * FROM test_tb_async_multi_statement WHERE i <= 1" ) == -1 ), "Invalid row count returned for insert" diff --git a/tests/integration/dbapi/sync/test_queries.py b/tests/integration/dbapi/sync/test_queries.py index 4e91e501194..787a96952e8 100644 --- a/tests/integration/dbapi/sync/test_queries.py +++ b/tests/integration/dbapi/sync/test_queries.py @@ -1,7 +1,7 @@ from datetime import date, datetime from typing import Any, List -from pytest import raises +from pytest import mark, raises from firebolt.async_db._types import ColType from firebolt.async_db.cursor import Column @@ -61,7 +61,13 @@ def test_select( data, all_types_query_response, "Invalid data returned by fetchmany" ) - # AWS ALB TCP timeout set to 350, make sure we handle the keepalive correctly + +@mark.timeout(timeout=400, method="signal") +def test_long_query( + connection: Connection, +) -> None: + """AWS ALB TCP timeout set to 350, make sure we handle the keepalive correctly""" + with connection.cursor() as c: c.execute( "SELECT sleepEachRow(1) from numbers(360)", set_parameters={"advanced_mode": "1", "use_standard_sql": "0"}, @@ -88,47 +94,51 @@ def test_query(c: Cursor, query: str) -> None: """Create table query is handled properly""" with connection.cursor() as c: # Cleanup - c.execute("DROP JOIN INDEX IF EXISTS test_db_join_idx") - c.execute("DROP AGGREGATING INDEX IF EXISTS test_db_agg_idx") - c.execute("DROP TABLE IF EXISTS test_tb") - c.execute("DROP TABLE IF EXISTS test_tb_dim") + c.execute("DROP JOIN INDEX IF EXISTS test_drop_create_db_join_idx") + c.execute("DROP AGGREGATING INDEX IF EXISTS test_drop_create_db_agg_idx") + c.execute("DROP TABLE IF EXISTS test_drop_create_tb") + c.execute("DROP TABLE IF EXISTS test_drop_create_tb_dim") # Fact table test_query( c, - "CREATE FACT TABLE test_tb(id int, sn string null, f float," + "CREATE FACT TABLE test_drop_create_tb(id int, sn string null, f float," "d date, dt datetime, b bool, a array(int)) primary index id", ) # Dimension table test_query( c, - "CREATE DIMENSION TABLE test_tb_dim(id int, sn string null, f float," - "d date, dt datetime, b bool, a array(int))", + "CREATE DIMENSION TABLE test_drop_create_tb_dim(id int, sn string null" + ", f float, d date, dt datetime, b bool, a array(int))", ) # Create join index - test_query(c, "CREATE JOIN INDEX test_db_join_idx ON test_tb_dim(id, sn, f)") + test_query( + c, + "CREATE JOIN INDEX test_drop_create_db_join_idx ON " + "test_drop_create_tb_dim(id, sn, f)", + ) # Create aggregating index test_query( c, - "CREATE AGGREGATING INDEX test_db_agg_idx ON " - "test_tb(id, sum(f), count(dt))", + "CREATE AGGREGATING INDEX test_drop_create_db_agg_idx ON " + "test_drop_create_tb(id, sum(f), count(dt))", ) # Drop join index - test_query(c, "DROP JOIN INDEX test_db_join_idx") + test_query(c, "DROP JOIN INDEX test_drop_create_db_join_idx") # Drop aggregating index - test_query(c, "DROP AGGREGATING INDEX test_db_agg_idx") + test_query(c, "DROP AGGREGATING INDEX test_drop_create_db_agg_idx") # Test drop once again - test_query(c, "DROP TABLE test_tb") - test_query(c, "DROP TABLE IF EXISTS test_tb") + test_query(c, "DROP TABLE test_drop_create_tb") + test_query(c, "DROP TABLE IF EXISTS test_drop_create_tb") - test_query(c, "DROP TABLE test_tb_dim") - test_query(c, "DROP TABLE IF EXISTS test_tb_dim") + test_query(c, "DROP TABLE test_drop_create_tb_dim") + test_query(c, "DROP TABLE IF EXISTS test_drop_create_tb_dim") def test_insert(connection: Connection) -> None: @@ -148,20 +158,20 @@ def test_empty_query(c: Cursor, query: str) -> None: c.fetchall() with connection.cursor() as c: - c.execute("DROP TABLE IF EXISTS test_tb") + c.execute("DROP TABLE IF EXISTS test_insert_tb") c.execute( - "CREATE FACT TABLE test_tb(id int, sn string null, f float," + "CREATE FACT TABLE test_insert_tb(id int, sn string null, f float," "d date, dt datetime, b bool, a array(int)) primary index id" ) test_empty_query( c, - "INSERT INTO test_tb VALUES (1, 'sn', 1.1, '2021-01-01'," + "INSERT INTO test_insert_tb VALUES (1, 'sn', 1.1, '2021-01-01'," "'2021-01-01 01:01:01', true, [1, 2, 3])", ) assert ( - c.execute("SELECT * FROM test_tb ORDER BY test_tb.id") == 1 + c.execute("SELECT * FROM test_insert_tb ORDER BY test_insert_tb.id") == 1 ), "Invalid data length in table after insert" assert_deep_eq(