@@ -549,6 +549,74 @@ def test_get_table_column_schema(self):
)
self .assertEqual (result , expected )
def test_peek_iterator_aborted (self ):
"""
Checking that an Aborted exception is retried in case it happened
while streaming the first element with a PeekIterator.
"""
from google .api_core .exceptions import Aborted
from google .cloud .spanner_dbapi .connection import connect
with mock .patch (
"google.cloud.spanner_v1.instance.Instance.exists" , return_value = True ,
):
with mock .patch (
"google.cloud.spanner_v1.database.Database.exists" , return_value = True ,
):
connection = connect ("test-instance" , "test-database" )
cursor = connection .cursor ()
with mock .patch (
"google.cloud.spanner_dbapi.utils.PeekIterator.__init__" ,
side_effect = (Aborted ("Aborted" ), None ),
):
with mock .patch (
"google.cloud.spanner_dbapi.connection.Connection.retry_transaction"
) as retry_mock :
with mock .patch (
"google.cloud.spanner_dbapi.connection.Connection.run_statement" ,
return_value = ((1 , 2 , 3 ), None ),
):
cursor .execute ("SELECT * FROM table_name" )
retry_mock .assert_called_with ()
def test_peek_iterator_aborted_autocommit (self ):
"""
Checking that an Aborted exception is retried in case it happened while
streaming the first element with a PeekIterator in autocommit mode.
"""
from google .api_core .exceptions import Aborted
from google .cloud .spanner_dbapi .connection import connect
with mock .patch (
"google.cloud.spanner_v1.instance.Instance.exists" , return_value = True ,
):
with mock .patch (
"google.cloud.spanner_v1.database.Database.exists" , return_value = True ,
):
connection = connect ("test-instance" , "test-database" )
connection .autocommit = True
cursor = connection .cursor ()
with mock .patch (
"google.cloud.spanner_dbapi.utils.PeekIterator.__init__" ,
side_effect = (Aborted ("Aborted" ), None ),
):
with mock .patch (
"google.cloud.spanner_dbapi.connection.Connection.retry_transaction"
) as retry_mock :
with mock .patch (
"google.cloud.spanner_dbapi.connection.Connection.run_statement" ,
return_value = ((1 , 2 , 3 ), None ),
):
with mock .patch (
"google.cloud.spanner_v1.database.Database.snapshot"
):
cursor .execute ("SELECT * FROM table_name" )
retry_mock .assert_called_with ()
def test_fetchone_retry_aborted (self ):
"""Check that aborted fetch re-executing transaction."""
from google .api_core .exceptions import Aborted