Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 15 additions & 5 deletions src/firebolt_db/firebolt_async_dialect.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

from asyncio import Lock
from types import ModuleType
from typing import Any, Iterator, List, Optional, Tuple
from typing import Any, Dict, Iterator, List, Optional, Tuple

import firebolt.async_db as async_dbapi
from firebolt.async_db import Connection
Expand Down Expand Up @@ -53,14 +53,24 @@ def arraysize(self, value: int) -> None:
def rowcount(self) -> int:
return self._cursor.rowcount

def execute(self, operation: str, parameters: Optional[Tuple] = None) -> None:
self.await_(self._execute(operation, parameters))
def execute(
self,
operation: str,
parameters: Optional[Tuple] = None,
set_parameters: Optional[Dict] = None,
) -> None:
self.await_(self._execute(operation, parameters, set_parameters=set_parameters))

async def _execute(
self, operation: str, parameters: Optional[Tuple] = None
self,
operation: str,
parameters: Optional[Tuple] = None,
set_parameters: Optional[Dict] = None,
) -> None:
async with self._adapt_connection._execute_mutex:
await self._cursor.execute(operation, parameters)
await self._cursor.execute(
operation, parameters, set_parameters=set_parameters
)
if self._cursor.description:
self._rows = await self._cursor.fetchall()
else:
Expand Down
8 changes: 5 additions & 3 deletions src/firebolt_db/firebolt_dialect.py
Original file line number Diff line number Diff line change
Expand Up @@ -270,7 +270,9 @@ def do_execute(
parameters: Tuple[str, Any],
context: Optional[ExecutionContext] = None,
) -> None:
cursor.execute(statement, set_parameters=self._set_parameters)
cursor.execute(
statement, parameters=parameters, set_parameters=self._set_parameters
)

def do_rollback(self, dbapi_connection: AlchemyConnection) -> None:
pass
Expand Down Expand Up @@ -300,5 +302,5 @@ def do_commit(self, dbapi_connection: AlchemyConnection) -> None:
dialect = FireboltDialect


def get_is_nullable(column_is_nullable: str) -> bool:
return column_is_nullable.lower() == "yes"
def get_is_nullable(column_is_nullable: int) -> bool:
return column_is_nullable == 1
2 changes: 2 additions & 0 deletions tests/integration/test_sqlalchemy_integration.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@ def test_data_write(self, connection: Connection, fact_table_name: str):
connection.execute(
f"INSERT INTO {fact_table_name}(idx, dummy) VALUES (1, 'some_text')"
)
result = connection.execute(f"SELECT * FROM {fact_table_name} WHERE idx=?", 1)
assert result.fetchall() == [(1, "some_text")]
result = connection.execute(f"SELECT * FROM {fact_table_name}")
assert len(result.fetchall()) == 1
# Update not supported
Expand Down
4 changes: 2 additions & 2 deletions tests/unit/test_firebolt_async_dialect.py
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ def test_cursor() -> AsyncCursorWrapper:
assert wrapper.description == "dummy"
assert wrapper.rowcount == -1
async_cursor.execute.assert_awaited_once_with(
"INSERT INTO test(a, b) VALUES (?, ?)", [(1, "a")]
"INSERT INTO test(a, b) VALUES (?, ?)", [(1, "a")], set_parameters=None
)
async_cursor.fetchall.assert_awaited_once()

Expand All @@ -105,7 +105,7 @@ def test_cursor() -> AsyncCursorWrapper:
assert wrapper.description is None
assert wrapper.rowcount == 100
async_cursor.execute.assert_awaited_once_with(
"INSERT INTO test(a, b) VALUES (?, ?)", [(1, "a")]
"INSERT INTO test(a, b) VALUES (?, ?)", [(1, "a")], set_parameters=None
)
async_cursor.fetchall.assert_not_awaited()

Expand Down
22 changes: 13 additions & 9 deletions tests/unit/test_firebolt_dialect.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,8 +64,15 @@ def test_do_execute(
self, dialect: FireboltDialect, cursor: mock.Mock(spec=MockCursor)
):
dialect._set_parameters = {"a": "b"}
dialect.do_execute(cursor, "SELECT *", None, None)
cursor.execute.assert_called_once_with("SELECT *", set_parameters={"a": "b"})
dialect.do_execute(cursor, "SELECT *", None)
cursor.execute.assert_called_once_with(
"SELECT *", parameters=None, set_parameters={"a": "b"}
)
cursor.execute.reset_mock()
dialect.do_execute(cursor, "SELECT *", (1, 22), None)
cursor.execute.assert_called_once_with(
"SELECT *", parameters=(1, 22), set_parameters={"a": "b"}
)

def test_schema_names(
self, dialect: FireboltDialect, connection: mock.Mock(spec=MockDBApi)
Expand Down Expand Up @@ -134,8 +141,8 @@ def getitem(self, idx):
return mock.Mock(__getitem__=getitem)

connection.execute.return_value = [
multi_column_row(["name1", "INT", "YES"]),
multi_column_row(["name2", "date", "no"]),
multi_column_row(["name1", "INT", 1]),
multi_column_row(["name2", "date", 0]),
]

expected_query = """
Expand Down Expand Up @@ -221,11 +228,8 @@ def test_unicode_description(


def test_get_is_nullable():
assert firebolt_db.firebolt_dialect.get_is_nullable("YES")
assert firebolt_db.firebolt_dialect.get_is_nullable("yes")
assert not firebolt_db.firebolt_dialect.get_is_nullable("NO")
assert not firebolt_db.firebolt_dialect.get_is_nullable("no")
assert not firebolt_db.firebolt_dialect.get_is_nullable("ABC")
assert firebolt_db.firebolt_dialect.get_is_nullable(1)
assert not firebolt_db.firebolt_dialect.get_is_nullable(0)


def test_types():
Expand Down