Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/integration-tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -45,5 +45,5 @@ jobs:
API_ENDPOINT: "api.dev.firebolt.io"
ACCOUNT_NAME: "firebolt"
run: |
pytest -o log_cli=true -o log_cli_level=INFO tests/integration
pytest -n 6 -o log_cli=true -o log_cli_level=INFO tests/integration

2 changes: 2 additions & 0 deletions setup.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,8 @@ dev =
pytest-cov==3.0.0
pytest-httpx==0.18.0
pytest-mock==3.6.1
pytest-timeout==2.1.0
pytest-xdist==2.5.0

[options.package_data]
firebolt = py.typed
Expand Down
70 changes: 43 additions & 27 deletions tests/integration/dbapi/async/test_queries_async.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,14 @@ async def test_select(
data, all_types_query_response, "Invalid data returned by fetchmany"
)

# AWS ALB TCP timeout set to 350, make sure we handle the keepalive correctly

@mark.asyncio
@mark.timeout(timeout=400, method="signal")
async def test_long_query(
connection: Connection,
) -> None:
"""AWS ALB TCP timeout set to 350, make sure we handle the keepalive correctly"""
with connection.cursor() as c:
await c.execute(
"SELECT sleepEachRow(1) from numbers(360)",
set_parameters={"advanced_mode": "1", "use_standard_sql": "0"},
Expand Down Expand Up @@ -92,35 +99,39 @@ async def test_query(c: Cursor, query: str) -> None:
"""Create table query is handled properly"""
with connection.cursor() as c:
# Cleanup
await c.execute("DROP JOIN INDEX IF EXISTS test_db_join_idx")
await c.execute("DROP AGGREGATING INDEX IF EXISTS test_db_agg_idx")
await c.execute("DROP TABLE IF EXISTS test_tb")
await c.execute("DROP TABLE IF EXISTS test_tb_dim")
await c.execute("DROP JOIN INDEX IF EXISTS test_drop_create_async_db_join_idx")
await c.execute(
"DROP AGGREGATING INDEX IF EXISTS test_drop_create_async_db_agg_idx"
)
await c.execute("DROP TABLE IF EXISTS test_drop_create_async_tb")
await c.execute("DROP TABLE IF EXISTS test_drop_create_async_tb_dim")

# Fact table
await test_query(
c,
"CREATE FACT TABLE test_tb(id int, sn string null, f float,"
"CREATE FACT TABLE test_drop_create_async(id int, sn string null, f float,"
"d date, dt datetime, b bool, a array(int)) primary index id",
)

# Dimension table
await test_query(
c,
"CREATE DIMENSION TABLE test_tb_dim(id int, sn string null, f float,"
"d date, dt datetime, b bool, a array(int))",
"CREATE DIMENSION TABLE test_drop_create_async_dim(id int, sn string null"
", f float, d date, dt datetime, b bool, a array(int))",
)

# Create join index
await test_query(
c, "CREATE JOIN INDEX test_db_join_idx ON test_tb_dim(id, sn, f)"
c,
"CREATE JOIN INDEX test_db_join_idx ON "
"test_drop_create_async_dim(id, sn, f)",
)

# Create aggregating index
await test_query(
c,
"CREATE AGGREGATING INDEX test_db_agg_idx ON "
"test_tb(id, sum(f), count(dt))",
"test_drop_create_async(id, sum(f), count(dt))",
)

# Drop join index
Expand All @@ -130,11 +141,11 @@ async def test_query(c: Cursor, query: str) -> None:
await test_query(c, "DROP AGGREGATING INDEX test_db_agg_idx")

# Test drop once again
await test_query(c, "DROP TABLE test_tb")
await test_query(c, "DROP TABLE IF EXISTS test_tb")
await test_query(c, "DROP TABLE test_drop_create_async")
await test_query(c, "DROP TABLE IF EXISTS test_drop_create_async")

await test_query(c, "DROP TABLE test_tb_dim")
await test_query(c, "DROP TABLE IF EXISTS test_tb_dim")
await test_query(c, "DROP TABLE test_drop_create_async_dim")
await test_query(c, "DROP TABLE IF EXISTS test_drop_create_async_dim")


@mark.asyncio
Expand All @@ -155,20 +166,23 @@ async def test_empty_query(c: Cursor, query: str) -> None:
await c.fetchall()

with connection.cursor() as c:
await c.execute("DROP TABLE IF EXISTS test_tb")
await c.execute("DROP TABLE IF EXISTS test_insert_async_tb")
await c.execute(
"CREATE FACT TABLE test_tb(id int, sn string null, f float,"
"CREATE FACT TABLE test_insert_async_tb(id int, sn string null, f float,"
"d date, dt datetime, b bool, a array(int)) primary index id"
)

await test_empty_query(
c,
"INSERT INTO test_tb VALUES (1, 'sn', 1.1, '2021-01-01',"
"INSERT INTO test_insert_async_tb VALUES (1, 'sn', 1.1, '2021-01-01',"
"'2021-01-01 01:01:01', true, [1, 2, 3])",
)

assert (
await c.execute("SELECT * FROM test_tb ORDER BY test_tb.id") == 1
await c.execute(
"SELECT * FROM test_insert_async_tb ORDER BY test_insert_async_tb.id"
)
== 1
), "Invalid data length in table after insert."

assert_deep_eq(
Expand Down Expand Up @@ -206,9 +220,9 @@ async def test_empty_query(c: Cursor, query: str, params: tuple) -> None:
await c.fetchall()

with connection.cursor() as c:
await c.execute("DROP TABLE IF EXISTS test_tb_parameterized")
await c.execute("DROP TABLE IF EXISTS test_tb_async_parameterized")
await c.execute(
"CREATE FACT TABLE test_tb_parameterized(i int, f float, s string, sn"
"CREATE FACT TABLE test_tb_async_parameterized(i int, f float, s string, sn"
" string null, d date, dt datetime, b bool, a array(int), ss string)"
" primary index i"
)
Expand All @@ -226,7 +240,8 @@ async def test_empty_query(c: Cursor, query: str, params: tuple) -> None:

await test_empty_query(
c,
"INSERT INTO test_tb_parameterized VALUES (?, ?, ?, ?, ?, ?, ?, ?, '\\?')",
"INSERT INTO test_tb_async_parameterized VALUES "
"(?, ?, ?, ?, ?, ?, ?, ?, '\\?')",
params,
)

Expand All @@ -237,7 +252,7 @@ async def test_empty_query(c: Cursor, query: str, params: tuple) -> None:
params[6] = 1

assert (
await c.execute("SELECT * FROM test_tb_parameterized") == 1
await c.execute("SELECT * FROM test_tb_async_parameterized") == 1
), "Invalid data length in table after parameterized insert"

assert_deep_eq(
Expand All @@ -252,16 +267,17 @@ async def test_multi_statement_query(connection: Connection) -> None:
"""Query parameters are handled properly"""

with connection.cursor() as c:
await c.execute("DROP TABLE IF EXISTS test_tb_multi_statement")
await c.execute("DROP TABLE IF EXISTS test_tb_async_multi_statement")
await c.execute(
"CREATE FACT TABLE test_tb_multi_statement(i int, s string) primary index i"
"CREATE FACT TABLE test_tb_async_multi_statement(i int, s string)"
" primary index i"
)

assert (
await c.execute(
"INSERT INTO test_tb_multi_statement values (1, 'a'), (2, 'b');"
"SELECT * FROM test_tb_multi_statement;"
"SELECT * FROM test_tb_multi_statement WHERE i <= 1"
"INSERT INTO test_tb_async_multi_statement values (1, 'a'), (2, 'b');"
"SELECT * FROM test_tb_async_multi_statement"
"SELECT * FROM test_tb_async_multi_statement WHERE i <= 1"
)
== -1
), "Invalid row count returned for insert"
Expand Down
54 changes: 32 additions & 22 deletions tests/integration/dbapi/sync/test_queries.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
from datetime import date, datetime
from typing import Any, List

from pytest import raises
from pytest import mark, raises

from firebolt.async_db._types import ColType
from firebolt.async_db.cursor import Column
Expand Down Expand Up @@ -61,7 +61,13 @@ def test_select(
data, all_types_query_response, "Invalid data returned by fetchmany"
)

# AWS ALB TCP timeout set to 350, make sure we handle the keepalive correctly

@mark.timeout(timeout=400, method="signal")
def test_long_query(
connection: Connection,
) -> None:
"""AWS ALB TCP timeout set to 350, make sure we handle the keepalive correctly"""
with connection.cursor() as c:
c.execute(
"SELECT sleepEachRow(1) from numbers(360)",
set_parameters={"advanced_mode": "1", "use_standard_sql": "0"},
Expand All @@ -88,47 +94,51 @@ def test_query(c: Cursor, query: str) -> None:
"""Create table query is handled properly"""
with connection.cursor() as c:
# Cleanup
c.execute("DROP JOIN INDEX IF EXISTS test_db_join_idx")
c.execute("DROP AGGREGATING INDEX IF EXISTS test_db_agg_idx")
c.execute("DROP TABLE IF EXISTS test_tb")
c.execute("DROP TABLE IF EXISTS test_tb_dim")
c.execute("DROP JOIN INDEX IF EXISTS test_drop_create_db_join_idx")
c.execute("DROP AGGREGATING INDEX IF EXISTS test_drop_create_db_agg_idx")
c.execute("DROP TABLE IF EXISTS test_drop_create_tb")
c.execute("DROP TABLE IF EXISTS test_drop_create_tb_dim")

# Fact table
test_query(
c,
"CREATE FACT TABLE test_tb(id int, sn string null, f float,"
"CREATE FACT TABLE test_drop_create_tb(id int, sn string null, f float,"
"d date, dt datetime, b bool, a array(int)) primary index id",
)

# Dimension table
test_query(
c,
"CREATE DIMENSION TABLE test_tb_dim(id int, sn string null, f float,"
"d date, dt datetime, b bool, a array(int))",
"CREATE DIMENSION TABLE test_drop_create_tb_dim(id int, sn string null"
", f float, d date, dt datetime, b bool, a array(int))",
)

# Create join index
test_query(c, "CREATE JOIN INDEX test_db_join_idx ON test_tb_dim(id, sn, f)")
test_query(
c,
"CREATE JOIN INDEX test_drop_create_db_join_idx ON "
"test_drop_create_tb_dim(id, sn, f)",
)

# Create aggregating index
test_query(
c,
"CREATE AGGREGATING INDEX test_db_agg_idx ON "
"test_tb(id, sum(f), count(dt))",
"CREATE AGGREGATING INDEX test_drop_create_db_agg_idx ON "
"test_drop_create_tb(id, sum(f), count(dt))",
)

# Drop join index
test_query(c, "DROP JOIN INDEX test_db_join_idx")
test_query(c, "DROP JOIN INDEX test_drop_create_db_join_idx")

# Drop aggregating index
test_query(c, "DROP AGGREGATING INDEX test_db_agg_idx")
test_query(c, "DROP AGGREGATING INDEX test_drop_create_db_agg_idx")

# Test drop once again
test_query(c, "DROP TABLE test_tb")
test_query(c, "DROP TABLE IF EXISTS test_tb")
test_query(c, "DROP TABLE test_drop_create_tb")
test_query(c, "DROP TABLE IF EXISTS test_drop_create_tb")

test_query(c, "DROP TABLE test_tb_dim")
test_query(c, "DROP TABLE IF EXISTS test_tb_dim")
test_query(c, "DROP TABLE test_drop_create_tb_dim")
test_query(c, "DROP TABLE IF EXISTS test_drop_create_tb_dim")


def test_insert(connection: Connection) -> None:
Expand All @@ -148,20 +158,20 @@ def test_empty_query(c: Cursor, query: str) -> None:
c.fetchall()

with connection.cursor() as c:
c.execute("DROP TABLE IF EXISTS test_tb")
c.execute("DROP TABLE IF EXISTS test_insert_tb")
c.execute(
"CREATE FACT TABLE test_tb(id int, sn string null, f float,"
"CREATE FACT TABLE test_insert_tb(id int, sn string null, f float,"
"d date, dt datetime, b bool, a array(int)) primary index id"
)

test_empty_query(
c,
"INSERT INTO test_tb VALUES (1, 'sn', 1.1, '2021-01-01',"
"INSERT INTO test_insert_tb VALUES (1, 'sn', 1.1, '2021-01-01',"
"'2021-01-01 01:01:01', true, [1, 2, 3])",
)

assert (
c.execute("SELECT * FROM test_tb ORDER BY test_tb.id") == 1
c.execute("SELECT * FROM test_insert_tb ORDER BY test_insert_tb.id") == 1
), "Invalid data length in table after insert"

assert_deep_eq(
Expand Down