diff --git a/src/firebolt/async_db/_types.py b/src/firebolt/async_db/_types.py index 9ad06e2cd48..0d8e17f3d25 100644 --- a/src/firebolt/async_db/_types.py +++ b/src/firebolt/async_db/_types.py @@ -3,10 +3,10 @@ from collections import namedtuple from datetime import date, datetime, timezone from enum import Enum -from typing import Sequence, Union +from typing import List, Sequence, Union from sqlparse import parse as parse_sql # type: ignore -from sqlparse.sql import Token, TokenList # type: ignore +from sqlparse.sql import Statement, Token, TokenList # type: ignore from sqlparse.tokens import Token as TokenType # type: ignore try: @@ -224,10 +224,9 @@ def format_value(value: ParameterType) -> str: raise DataError(f"unsupported parameter type {type(value)}") -def format_sql(query: str, parameters: Sequence[ParameterType]) -> str: +def format_statement(statement: Statement, parameters: Sequence[ParameterType]) -> str: """ - Substitute placeholders in queries with provided values. - '?' symbol is used as a placeholder. Using '\\?' would result in a plain '?' + Substitute placeholders in a sqlparse statement with provided values. """ idx = 0 @@ -245,16 +244,11 @@ def process_token(token: Token) -> Token: return Token(TokenType.Text, formatted) if isinstance(token, TokenList): # Process all children tokens - token.tokens = [process_token(t) for t in token.tokens] - return token - parsed = parse_sql(query) - if not parsed: - return query - if len(parsed) > 1: - raise NotSupportedError("Multi-statement queries are not supported") + return TokenList([process_token(t) for t in token.tokens]) + return token - formatted_sql = str(process_token(parsed[0])) + formatted_sql = str(process_token(statement)).rstrip(";") if idx < len(parameters): raise DataError( @@ -263,3 +257,24 @@ def process_token(token: Token) -> Token: ) return formatted_sql + + +def split_format_sql( + query: str, parameters: Sequence[Sequence[ParameterType]] +) -> List[str]: + """ + Split a query into separate statement, and format it with parameters + if it's a single statement + Trying to format a multi-statement query would result in NotSupportedError + """ + statements = parse_sql(query) + if not statements: + return [query] + + if parameters: + if len(statements) > 1: + raise NotSupportedError( + "formatting multistatement queries is not supported" + ) + return [format_statement(statements[0], paramset) for paramset in parameters] + return [str(st).strip().rstrip(";") for st in statements] diff --git a/src/firebolt/async_db/cursor.py b/src/firebolt/async_db/cursor.py index 2418bd4d4a2..8f39cf52e48 100644 --- a/src/firebolt/async_db/cursor.py +++ b/src/firebolt/async_db/cursor.py @@ -6,7 +6,6 @@ from enum import Enum from functools import wraps from inspect import cleandoc -from json import JSONDecodeError from types import TracebackType from typing import ( TYPE_CHECKING, @@ -27,9 +26,9 @@ Column, ParameterType, RawColType, - format_sql, parse_type, parse_value, + split_format_sql, ) from firebolt.async_db.util import is_db_available, is_engine_running from firebolt.client import AsyncClient @@ -38,7 +37,6 @@ DataError, EngineNotRunningError, FireboltDatabaseError, - NotSupportedError, OperationalError, ProgrammingError, QueryNotRunError, @@ -55,6 +53,7 @@ class CursorState(Enum): NONE = 1 + ERROR = 2 DONE = 3 CLOSED = 4 @@ -99,6 +98,8 @@ class BaseCursor: "_rows", "_idx", "_idx_lock", + "_row_sets", + "_next_set_idx", ) default_arraysize = 1 @@ -107,8 +108,15 @@ def __init__(self, client: AsyncClient, connection: Connection): self.connection = connection self._client = client self._arraysize = self.default_arraysize + # These fields initialized here for type annotations purpose self._rows: Optional[List[List[RawColType]]] = None self._descriptions: Optional[List[Column]] = None + self._row_sets: List[ + Tuple[int, Optional[List[Column]], Optional[List[List[RawColType]]]] + ] = [] + self._rowcount = -1 + self._idx = 0 + self._next_set_idx = 0 self._reset() def __del__(self) -> None: @@ -164,24 +172,58 @@ def close(self) -> None: # remove typecheck skip after connection is implemented self.connection._remove_cursor(self) # type: ignore - def _store_query_data(self, response: Response) -> None: + def _append_query_data(self, response: Response) -> None: """Store information about executed query from httpx response.""" + row_set: Tuple[ + int, Optional[List[Column]], Optional[List[List[RawColType]]] + ] = (-1, None, None) + # Empty response is returned for insert query - if response.headers.get("content-length", "") == "0": - return - try: - query_data = response.json() - self._rowcount = int(query_data["rows"]) - self._descriptions = [ - Column(d["name"], parse_type(d["type"]), None, None, None, None, None) - for d in query_data["meta"] - ] - - # Parse data during fetch - self._rows = query_data["data"] - except (KeyError, JSONDecodeError) as err: - raise DataError(f"Invalid query data format: {str(err)}") + if response.headers.get("content-length", "") != "0": + try: + query_data = response.json() + rowcount = int(query_data["rows"]) + descriptions = [ + Column( + d["name"], parse_type(d["type"]), None, None, None, None, None + ) + for d in query_data["meta"] + ] + + # Parse data during fetch + rows = query_data["data"] + row_set = (rowcount, descriptions, rows) + except (KeyError, ValueError) as err: + raise DataError(f"Invalid query data format: {str(err)}") + + self._row_sets.append(row_set) + if self._next_set_idx == 0: + # Populate values for first set + self._pop_next_set() + + @check_not_closed + @check_query_executed + def nextset(self) -> Optional[bool]: + """ + Skip to the next available set, discarding any remaining rows + from the current set. + Returns True if operation was successful, + None if there are no more sets to retrive + """ + return self._pop_next_set() + + def _pop_next_set(self) -> Optional[bool]: + """ + Same functionality as .nextset, but doesn't check that query has been executed. + """ + if self._next_set_idx >= len(self._row_sets): + return None + self._rowcount, self._descriptions, self._rows = self._row_sets[ + self._next_set_idx + ] + self._next_set_idx += 1 + return True async def _raise_if_error(self, resp: Response) -> None: """Raise a proper error if any""" @@ -213,29 +255,52 @@ def _reset(self) -> None: self._descriptions = None self._rowcount = -1 self._idx = 0 + self._row_sets = [] + self._next_set_idx = 0 async def _do_execute_request( self, query: str, - parameters: Optional[Sequence[ParameterType]] = None, + parameters: Sequence[Sequence[ParameterType]], set_parameters: Optional[Dict] = None, - ) -> Response: - if parameters: - query = format_sql(query, parameters) - - resp = await self._client.request( - url="/", - method="POST", - params={ - "database": self.connection.database, - "output_format": JSON_OUTPUT_FORMAT, - **(set_parameters or dict()), - }, - content=query, - ) + ) -> None: + self._reset() + try: + + queries = split_format_sql(query, parameters) + + for query in queries: + + start_time = time.time() + # our CREATE EXTERNAL TABLE queries currently require credentials, + # so we will skip logging those queries. + # https://docs.firebolt.io/sql-reference/commands/ddl-commands#create-external-table + if not re.search("aws_key_id|credentials", query, flags=re.IGNORECASE): + logger.debug(f"Running query: {query}") + + resp = await self._client.request( + url="/", + method="POST", + params={ + "database": self.connection.database, + "output_format": JSON_OUTPUT_FORMAT, + **(set_parameters or dict()), + }, + content=query, + ) + + await self._raise_if_error(resp) + self._append_query_data(resp) + logger.info( + f"Query fetched {self.rowcount} rows in" + f" {time.time() - start_time} seconds" + ) + + self._state = CursorState.DONE - await self._raise_if_error(resp) - return resp + except Exception: + self._state = CursorState.ERROR + raise @check_not_closed async def execute( @@ -245,21 +310,9 @@ async def execute( set_parameters: Optional[Dict] = None, ) -> int: """Prepare and execute a database query. Return row count.""" - start_time = time.time() - # our CREATE EXTERNAL TABLE queries currently require credentials, - # so we will skip logging those queries. - # https://docs.firebolt.io/sql-reference/commands/ddl-commands#create-external-table - if not re.search("aws_key_id|credentials", query, flags=re.IGNORECASE): - logger.debug(f"Running query: {query}") - - self._reset() - resp = await self._do_execute_request(query, parameters, set_parameters) - self._store_query_data(resp) - self._state = CursorState.DONE - logger.info( - f"Query fetched {self.rowcount} rows in {time.time() - start_time} seconds" - ) + params_list = [parameters] if parameters else [] + await self._do_execute_request(query, params_list, set_parameters) return self.rowcount @check_not_closed @@ -270,19 +323,7 @@ async def executemany( Prepare and execute a database query against all parameter sequences provided. Return last query row count. """ - - if len(parameters_seq) > 1: - raise NotSupportedError( - "Parameterized multi-statement queries are not supported" - ) - - self._reset() - resp = None - for parameters in parameters_seq: - resp = await self._do_execute_request(query, parameters) - if resp is not None: - self._store_query_data(resp) - self._state = CursorState.DONE + await self._do_execute_request(query, parameters_seq) return self.rowcount def _parse_row(self, row: List[RawColType]) -> List[ColType]: diff --git a/tests/integration/dbapi/async/test_queries_async.py b/tests/integration/dbapi/async/test_queries_async.py index e7c24c33880..205298f45b8 100644 --- a/tests/integration/dbapi/async/test_queries_async.py +++ b/tests/integration/dbapi/async/test_queries_async.py @@ -237,3 +237,44 @@ async def test_empty_query(c: Cursor, query: str, params: tuple) -> None: [params + ["?"]], "Invalid data in table after parameterized insert", ) + + +@mark.asyncio +async def test_multi_statement_query(connection: Connection) -> None: + """Query parameters are handled properly""" + + with connection.cursor() as c: + await c.execute("DROP TABLE IF EXISTS test_tb_multi_statement") + await c.execute( + "CREATE FACT TABLE test_tb_multi_statement(i int, s string) primary index i" + ) + + assert ( + await c.execute( + "INSERT INTO test_tb_multi_statement values (1, 'a'), (2, 'b');" + "SELECT * FROM test_tb_multi_statement" + ) + == -1 + ), "Invalid row count returned for insert" + assert c.rowcount == -1, "Invalid row count" + assert c.description is None, "Invalid description" + + assert c.nextset() + + assert c.rowcount == 2, "Invalid select row count" + assert_deep_eq( + c.description, + [ + Column("i", int, None, None, None, None, None), + Column("s", str, None, None, None, None, None), + ], + "Invalid select query description", + ) + + assert_deep_eq( + await c.fetchall(), + [[1, "a"], [2, "b"]], + "Invalid data in table after parameterized insert", + ) + + assert c.nextset() is None diff --git a/tests/integration/dbapi/sync/test_queries.py b/tests/integration/dbapi/sync/test_queries.py index fa00f9b129f..f730405d25b 100644 --- a/tests/integration/dbapi/sync/test_queries.py +++ b/tests/integration/dbapi/sync/test_queries.py @@ -229,3 +229,43 @@ def test_empty_query(c: Cursor, query: str, params: tuple) -> None: [params + ["?"]], "Invalid data in table after parameterized insert", ) + + +def test_multi_statement_query(connection: Connection) -> None: + """Query parameters are handled properly""" + + with connection.cursor() as c: + c.execute("DROP TABLE IF EXISTS test_tb_multi_statement") + c.execute( + "CREATE FACT TABLE test_tb_multi_statement(i int, s string) primary index i" + ) + + assert ( + c.execute( + "INSERT INTO test_tb_multi_statement values (1, 'a'), (2, 'b');" + "SELECT * FROM test_tb_multi_statement" + ) + == -1 + ), "Invalid row count returned for insert" + assert c.rowcount == -1, "Invalid row count" + assert c.description is None, "Invalid description" + + assert c.nextset() + + assert c.rowcount == 2, "Invalid select row count" + assert_deep_eq( + c.description, + [ + Column("i", int, None, None, None, None, None), + Column("s", str, None, None, None, None, None), + ], + "Invalid select query description", + ) + + assert_deep_eq( + c.fetchall(), + [[1, "a"], [2, "b"]], + "Invalid data in table after parameterized insert", + ) + + assert c.nextset() is None diff --git a/tests/unit/async_db/test_cursor.py b/tests/unit/async_db/test_cursor.py index 5dda697db74..ead05bb0174 100644 --- a/tests/unit/async_db/test_cursor.py +++ b/tests/unit/async_db/test_cursor.py @@ -13,7 +13,6 @@ DataError, EngineNotRunningError, FireboltDatabaseError, - NotSupportedError, OperationalError, QueryNotRunError, ) @@ -37,6 +36,16 @@ async def test_cursor_state( await cursor.execute("select") assert cursor._state == CursorState.DONE + def error_query_callback(*args, **kwargs): + raise Exception() + + httpx_mock.add_callback(error_query_callback, url=query_url) + + cursor._reset() + with raises(Exception): + await cursor.execute("select") + assert cursor._state == CursorState.ERROR + cursor._reset() assert cursor._state == CursorState.NONE @@ -55,10 +64,7 @@ async def test_closed_cursor(cursor: Cursor): ("fetchmany", ()), ("fetchall", ()), ) - methods = ( - "setinputsizes", - "setoutputsize", - ) + methods = ("setinputsizes", "setoutputsize", "nextset") cursor.close() @@ -109,6 +115,9 @@ async def test_cursor_no_query( with raises(QueryNotRunError): await getattr(cursor, amethod)() + with raises(QueryNotRunError): + await cursor.nextset() + with raises(QueryNotRunError): [r async for r in cursor] @@ -147,8 +156,8 @@ async def test_cursor_execute( """Cursor is able to execute query, all fields are populated properly.""" for query in ( - lambda: cursor.execute("select *"), - lambda: cursor.executemany("select *", [None]), + lambda: cursor.execute("select * from t"), + lambda: cursor.executemany("select * from t", []), ): # Query with json output httpx_mock.add_callback(auth_callback, url=auth_url) @@ -192,7 +201,7 @@ async def test_cursor_execute_error( """Cursor handles all types of errors properly.""" for query in ( lambda: cursor.execute("select *"), - lambda: cursor.executemany("select *", [None]), + lambda: cursor.executemany("select *", []), ): httpx_mock.add_callback(auth_callback, url=auth_url) @@ -204,6 +213,7 @@ def http_error(**kwargs): with raises(StreamError) as excinfo: await query() + assert cursor._state == CursorState.ERROR assert str(excinfo.value) == "httpx error", "Invalid query error message" # HTTP error @@ -212,6 +222,7 @@ def http_error(**kwargs): await query() errmsg = str(excinfo.value) + assert cursor._state == CursorState.ERROR assert "Bad Request" in errmsg, "Invalid query error message" # Database query error @@ -223,6 +234,7 @@ def http_error(**kwargs): with raises(OperationalError) as excinfo: await query() + assert cursor._state == CursorState.ERROR assert ( str(excinfo.value) == "Error executing query:\nQuery error message" ), "Invalid authentication error message" @@ -239,6 +251,7 @@ def http_error(**kwargs): ) with raises(FireboltDatabaseError) as excinfo: await query() + assert cursor._state == CursorState.ERROR # Engine is not running error httpx_mock.add_response( @@ -255,6 +268,7 @@ def http_error(**kwargs): ) with raises(EngineNotRunningError) as excinfo: await query() + assert cursor._state == CursorState.ERROR httpx_mock.reset(True) @@ -410,7 +424,39 @@ async def test_set_parameters( @mark.asyncio -async def test_cursor_multi_statement(cursor: Cursor): +async def test_cursor_multi_statement( + httpx_mock: HTTPXMock, + auth_callback: Callable, + auth_url: str, + query_callback: Callable, + insert_query_callback: Callable, + query_url: str, + cursor: Cursor, + python_query_description: List[Column], + python_query_data: List[List[ColType]], +): """executemany with multiple parameter sets is not supported""" - with raises(NotSupportedError): - await cursor.executemany("select ?", [(1,), (2,)]) + httpx_mock.add_callback(auth_callback, url=auth_url) + httpx_mock.add_callback(query_callback, url=query_url) + httpx_mock.add_callback(insert_query_callback, url=query_url) + + rc = await cursor.execute("select * from t; insert into t values (1, 2)") + assert rc == len(python_query_data), "Invalid row count returned" + assert cursor.rowcount == len(python_query_data), "Invalid cursor row count" + for i, (desc, exp) in enumerate(zip(cursor.description, python_query_description)): + assert desc == exp, f"Invalid column description at position {i}" + + for i in range(cursor.rowcount): + assert ( + await cursor.fetchone() == python_query_data[i] + ), f"Invalid data row at position {i}" + + assert cursor.nextset() + assert cursor.rowcount == -1, "Invalid cursor row count" + assert cursor.description is None, "Invalid cursor description" + with raises(DataError) as exc_info: + await cursor.fetchall() + + assert str(exc_info.value) == "no rows to fetch", "Invalid error message" + + assert cursor.nextset() is None diff --git a/tests/unit/async_db/test_typing_format.py b/tests/unit/async_db/test_typing_format.py index bb03b79b29d..b394cc7e3b4 100644 --- a/tests/unit/async_db/test_typing_format.py +++ b/tests/unit/async_db/test_typing_format.py @@ -1,9 +1,16 @@ from datetime import date, datetime, timedelta, timezone +from typing import List from pytest import mark, raises +from sqlparse import parse +from sqlparse.sql import Statement -from firebolt.async_db import DataError -from firebolt.async_db._types import format_sql, format_value +from firebolt.async_db import DataError, NotSupportedError +from firebolt.async_db._types import ( + format_statement, + format_value, + split_format_sql, +) @mark.parametrize( @@ -45,53 +52,98 @@ def test_format_value_errors() -> None: assert str(exc_info.value) == "unsupported parameter type " +def to_statement(sql: str) -> Statement: + return parse(sql)[0] + + @mark.parametrize( - "sql,params,result", + "statement,params,result", [ - ("", (), ""), - ("select * from table", (), "select * from table"), + (to_statement("select * from table"), (), "select * from table"), ( - "select * from table where id == ?", + to_statement("select * from table where id == ?"), (1,), "select * from table where id == 1", ), ( - "select * from table where id == '?'", + to_statement("select * from table where id == '?'"), (), "select * from table where id == '?'", ), ( - "insert into table values (?, ?, '?')", + to_statement("insert into table values (?, ?, '?')"), (1, "1"), "insert into table values (1, '1', '?')", ), ( - "select * from t where /*comment ?*/ id == ?", + to_statement("select * from t where /*comment ?*/ id == ?"), ("*/ 1 == 1 or /*",), "select * from t where /*comment ?*/ id == '*/ 1 == 1 or /*'", ), ( - "select * from t where id == ?", + to_statement("select * from t where id == ?"), ("' or '' == '",), r"select * from t where id == '\' or \'\' == \''", ), ], ) -def test_format_sql(sql: str, params: tuple, result: str) -> None: - assert format_sql(sql, params) == result, "Invalid format sql result" +def test_format_statement(statement: Statement, params: tuple, result: str) -> None: + assert format_statement(statement, params) == result, "Invalid format sql result" -def test_format_sql_errors() -> None: +def test_format_statement_errors() -> None: with raises(DataError) as exc_info: - format_sql("?", []) + format_statement(to_statement("?"), []) assert ( str(exc_info.value) == "not enough parameters provided for substitution: given 0, found one more" ), "Invalid not enought parameters error" with raises(DataError) as exc_info: - format_sql("?", (1, 2)) + format_statement(to_statement("?"), (1, 2)) assert ( str(exc_info.value) == "too many parameters provided for substitution: given 2, used only 1" ), "Invalid not enought parameters error" + + +@mark.parametrize( + "query,params,result", + [ + ("", (), [""]), + ("select * from t", (), ["select * from t"]), + ("select * from t;", (), ["select * from t"]), + ("select * from t where id == ?", ((1,),), ["select * from t where id == 1"]), + ("select * from t where id == ?;", ((1,),), ["select * from t where id == 1"]), + ( + "select * from t;insert into t values (1, 2)", + (), + ["select * from t", "insert into t values (1, 2)"], + ), + ( + "insert into t values (1, 2);select * from t;", + (), + ["insert into t values (1, 2)", "select * from t"], + ), + ( + "select * from t where id == ?", + ((1,), (2,)), + ["select * from t where id == 1", "select * from t where id == 2"], + ), + ], +) +def test_split_format_sql(query: str, params: tuple, result: List[str]) -> None: + assert ( + split_format_sql(query, params) == result + ), "Invalid split and format sql result" + + +def test_split_format_error() -> None: + with raises(NotSupportedError) as exc_info: + split_format_sql( + "select * from t where id == ?; insert into t values (?, ?)", ((1, 2, 3),) + ) + + assert ( + str(exc_info.value) == "formatting multistatement queries is not supported" + ), "Invalid not supported error message" diff --git a/tests/unit/db/test_cursor.py b/tests/unit/db/test_cursor.py index 13da4c989e4..652d7b56d03 100644 --- a/tests/unit/db/test_cursor.py +++ b/tests/unit/db/test_cursor.py @@ -9,7 +9,6 @@ from firebolt.common.exception import ( CursorClosedError, DataError, - NotSupportedError, OperationalError, QueryNotRunError, ) @@ -33,6 +32,16 @@ def test_cursor_state( cursor.execute("select") assert cursor._state == CursorState.DONE + def error_query_callback(*args, **kwargs): + raise Exception() + + httpx_mock.add_callback(error_query_callback, url=query_url) + + cursor._reset() + with raises(Exception): + cursor.execute("select") + assert cursor._state == CursorState.ERROR + cursor._reset() assert cursor._state == CursorState.NONE @@ -51,6 +60,7 @@ def test_closed_cursor(cursor: Cursor): ("fetchall", ()), ("setinputsizes", (cursor, [0])), ("setoutputsize", (cursor, 0)), + ("nextset", (cursor, [])), ) cursor.close() @@ -88,6 +98,7 @@ def test_cursor_no_query( "fetchone", "fetchmany", "fetchall", + "nextset", ) httpx_mock.add_callback(auth_callback, url=auth_url) @@ -135,7 +146,7 @@ def test_cursor_execute( for query in ( lambda: cursor.execute("select *"), - lambda: cursor.executemany("select *", [None]), + lambda: cursor.executemany("select *", []), ): # Query with json output httpx_mock.add_callback(auth_callback, url=auth_url) @@ -174,7 +185,7 @@ def test_cursor_execute_error( """Cursor handles all types of errors properly.""" for query in ( lambda: cursor.execute("select *"), - lambda: cursor.executemany("select *", [None]), + lambda: cursor.executemany("select *", []), ): httpx_mock.add_callback(auth_callback, url=auth_url) @@ -186,6 +197,7 @@ def http_error(**kwargs): with raises(StreamError) as excinfo: query() + assert cursor._state == CursorState.ERROR assert str(excinfo.value) == "httpx error", "Invalid query error message" # HTTP error @@ -194,6 +206,7 @@ def http_error(**kwargs): query() errmsg = str(excinfo.value) + assert cursor._state == CursorState.ERROR assert "Bad Request" in errmsg, "Invalid query error message" # Database query error @@ -205,6 +218,7 @@ def http_error(**kwargs): with raises(OperationalError) as excinfo: query() + assert cursor._state == CursorState.ERROR assert ( str(excinfo.value) == "Error executing query:\nQuery error message" ), "Invalid authentication error message" @@ -357,7 +371,39 @@ def test_set_parameters( cursor.execute("select 1", set_parameters=set_params) -def test_cursor_multi_statement(cursor: Cursor): +def test_cursor_multi_statement( + httpx_mock: HTTPXMock, + auth_callback: Callable, + auth_url: str, + query_callback: Callable, + insert_query_callback: Callable, + query_url: str, + cursor: Cursor, + python_query_description: List[Column], + python_query_data: List[List[ColType]], +): """executemany with multiple parameter sets is not supported""" - with raises(NotSupportedError): - cursor.executemany("select ?", [(1,), (2,)]) + httpx_mock.add_callback(auth_callback, url=auth_url) + httpx_mock.add_callback(query_callback, url=query_url) + httpx_mock.add_callback(insert_query_callback, url=query_url) + + rc = cursor.execute("select * from t; insert into t values (1, 2)") + assert rc == len(python_query_data), "Invalid row count returned" + assert cursor.rowcount == len(python_query_data), "Invalid cursor row count" + for i, (desc, exp) in enumerate(zip(cursor.description, python_query_description)): + assert desc == exp, f"Invalid column description at position {i}" + + for i in range(cursor.rowcount): + assert ( + cursor.fetchone() == python_query_data[i] + ), f"Invalid data row at position {i}" + + assert cursor.nextset() + assert cursor.rowcount == -1, "Invalid cursor row count" + assert cursor.description is None, "Invalid cursor description" + with raises(DataError) as exc_info: + cursor.fetchall() + + assert str(exc_info.value) == "no rows to fetch", "Invalid error message" + + assert cursor.nextset() is None