Skip to content

Commit

Permalink
feat(db_api): add an ability to set ReadOnly/ReadWrite connection mode (
Browse files Browse the repository at this point in the history
  • Loading branch information
Ilya Gurov authored Oct 5, 2021
1 parent aaec1db commit cd3b950
Show file tree
Hide file tree
Showing 5 changed files with 205 additions and 82 deletions.
72 changes: 69 additions & 3 deletions google/cloud/spanner_dbapi/connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
from google.api_core.gapic_v1.client_info import ClientInfo
from google.cloud import spanner_v1 as spanner
from google.cloud.spanner_v1.session import _get_retry_delay
from google.cloud.spanner_v1.snapshot import Snapshot

from google.cloud.spanner_dbapi._helpers import _execute_insert_heterogenous
from google.cloud.spanner_dbapi._helpers import _execute_insert_homogenous
Expand Down Expand Up @@ -50,15 +51,31 @@ class Connection:
:type database: :class:`~google.cloud.spanner_v1.database.Database`
:param database: The database to which the connection is linked.
:type read_only: bool
:param read_only:
Flag to indicate that the connection may only execute queries and no update or DDL statements.
If True, the connection will use a single use read-only transaction with strong timestamp
bound for each new statement, and will immediately see any changes that have been committed by
any other transaction.
If autocommit is false, the connection will automatically start a new multi use read-only transaction
with strong timestamp bound when the first statement is executed. This read-only transaction will be
used for all subsequent statements until either commit() or rollback() is called on the connection. The
read-only transaction will read from a consistent snapshot of the database at the time that the
transaction started. This means that the transaction will not see any changes that have been
committed by other transactions since the start of the read-only transaction. Commit or rolling back
the read-only transaction is semantically the same, and only indicates that the read-only transaction
should end a that a new one should be started when the next statement is executed.
"""

def __init__(self, instance, database):
def __init__(self, instance, database, read_only=False):
self._instance = instance
self._database = database
self._ddl_statements = []

self._transaction = None
self._session = None
self._snapshot = None
# SQL statements, which were executed
# within the current transaction
self._statements = []
Expand All @@ -69,6 +86,7 @@ def __init__(self, instance, database):
# this connection should be cleared on the
# connection close
self._own_pool = True
self._read_only = read_only

@property
def autocommit(self):
Expand Down Expand Up @@ -123,6 +141,30 @@ def instance(self):
"""
return self._instance

@property
def read_only(self):
"""Flag: the connection can be used only for database reads.
Returns:
bool:
True if the connection may only be used for database reads.
"""
return self._read_only

@read_only.setter
def read_only(self, value):
"""`read_only` flag setter.
Args:
value (bool): True for ReadOnly mode, False for ReadWrite.
"""
if self.inside_transaction:
raise ValueError(
"Connection read/write mode can't be changed while a transaction is in progress. "
"Commit or rollback the current transaction and try again."
)
self._read_only = value

def _session_checkout(self):
"""Get a Cloud Spanner session from the pool.
Expand Down Expand Up @@ -231,6 +273,22 @@ def transaction_checkout(self):

return self._transaction

def snapshot_checkout(self):
"""Get a Cloud Spanner snapshot.
Initiate a new multi-use snapshot, if there is no snapshot in
this connection yet. Return the existing one otherwise.
:rtype: :class:`google.cloud.spanner_v1.snapshot.Snapshot`
:returns: A Cloud Spanner snapshot object, ready to use.
"""
if self.read_only and not self.autocommit:
if not self._snapshot:
self._snapshot = Snapshot(self._session_checkout(), multi_use=True)
self._snapshot.begin()

return self._snapshot

def _raise_if_closed(self):
"""Helper to check the connection state before running a query.
Raises an exception if this connection is closed.
Expand Down Expand Up @@ -259,14 +317,18 @@ def commit(self):
This method is non-operational in autocommit mode.
"""
self._snapshot = None

if self._autocommit:
warnings.warn(AUTOCOMMIT_MODE_WARNING, UserWarning, stacklevel=2)
return

self.run_prior_DDL_statements()
if self.inside_transaction:
try:
self._transaction.commit()
if not self.read_only:
self._transaction.commit()

self._release_session()
self._statements = []
except Aborted:
Expand All @@ -279,10 +341,14 @@ def rollback(self):
This is a no-op if there is no active transaction or if the connection
is in autocommit mode.
"""
self._snapshot = None

if self._autocommit:
warnings.warn(AUTOCOMMIT_MODE_WARNING, UserWarning, stacklevel=2)
elif self._transaction:
self._transaction.rollback()
if not self.read_only:
self._transaction.rollback()

self._release_session()
self._statements = []

Expand Down
86 changes: 47 additions & 39 deletions google/cloud/spanner_dbapi/cursor.py
Original file line number Diff line number Diff line change
Expand Up @@ -186,6 +186,10 @@ def execute(self, sql, args=None):

# Classify whether this is a read-only SQL statement.
try:
if self.connection.read_only:
self._handle_DQL(sql, args or None)
return

classification = parse_utils.classify_stmt(sql)
if classification == parse_utils.STMT_DDL:
ddl_statements = []
Expand Down Expand Up @@ -325,14 +329,15 @@ def fetchone(self):

try:
res = next(self)
if not self.connection.autocommit:
if not self.connection.autocommit and not self.connection.read_only:
self._checksum.consume_result(res)
return res
except StopIteration:
return
except Aborted:
self.connection.retry_transaction()
return self.fetchone()
if not self.connection.read_only:
self.connection.retry_transaction()
return self.fetchone()

def fetchall(self):
"""Fetch all (remaining) rows of a query result, returning them as
Expand All @@ -343,12 +348,13 @@ def fetchall(self):
res = []
try:
for row in self:
if not self.connection.autocommit:
if not self.connection.autocommit and not self.connection.read_only:
self._checksum.consume_result(row)
res.append(row)
except Aborted:
self.connection.retry_transaction()
return self.fetchall()
if not self.connection.read_only:
self.connection.retry_transaction()
return self.fetchall()

return res

Expand All @@ -372,14 +378,15 @@ def fetchmany(self, size=None):
for i in range(size):
try:
res = next(self)
if not self.connection.autocommit:
if not self.connection.autocommit and not self.connection.read_only:
self._checksum.consume_result(res)
items.append(res)
except StopIteration:
break
except Aborted:
self.connection.retry_transaction()
return self.fetchmany(size)
if not self.connection.read_only:
self.connection.retry_transaction()
return self.fetchmany(size)

return items

Expand All @@ -395,38 +402,39 @@ def setoutputsize(self, size, column=None):
"""A no-op, raising an error if the cursor or connection is closed."""
self._raise_if_closed()

def _handle_DQL_with_snapshot(self, snapshot, sql, params):
# Reference
# https://googleapis.dev/python/spanner/latest/session-api.html#google.cloud.spanner_v1.session.Session.execute_sql
sql, params = parse_utils.sql_pyformat_args_to_spanner(sql, params)
res = snapshot.execute_sql(
sql, params=params, param_types=get_param_types(params)
)
# Immediately using:
# iter(response)
# here, because this Spanner API doesn't provide
# easy mechanisms to detect when only a single item
# is returned or many, yet mixing results that
# are for .fetchone() with those that would result in
# many items returns a RuntimeError if .fetchone() is
# invoked and vice versa.
self._result_set = res
# Read the first element so that the StreamedResultSet can
# return the metadata after a DQL statement. See issue #155.
self._itr = PeekIterator(self._result_set)
# Unfortunately, Spanner doesn't seem to send back
# information about the number of rows available.
self._row_count = _UNSET_COUNT

def _handle_DQL(self, sql, params):
with self.connection.database.snapshot() as snapshot:
# Reference
# https://googleapis.dev/python/spanner/latest/session-api.html#google.cloud.spanner_v1.session.Session.execute_sql
sql, params = parse_utils.sql_pyformat_args_to_spanner(sql, params)
res = snapshot.execute_sql(
sql, params=params, param_types=get_param_types(params)
if self.connection.read_only and not self.connection.autocommit:
# initiate or use the existing multi-use snapshot
self._handle_DQL_with_snapshot(
self.connection.snapshot_checkout(), sql, params
)
if type(res) == int:
self._row_count = res
self._itr = None
else:
# Immediately using:
# iter(response)
# here, because this Spanner API doesn't provide
# easy mechanisms to detect when only a single item
# is returned or many, yet mixing results that
# are for .fetchone() with those that would result in
# many items returns a RuntimeError if .fetchone() is
# invoked and vice versa.
self._result_set = res
# Read the first element so that the StreamedResultSet can
# return the metadata after a DQL statement. See issue #155.
while True:
try:
self._itr = PeekIterator(self._result_set)
break
except Aborted:
self.connection.retry_transaction()
# Unfortunately, Spanner doesn't seem to send back
# information about the number of rows available.
self._row_count = _UNSET_COUNT
else:
# execute with single-use snapshot
with self.connection.database.snapshot() as snapshot:
self._handle_DQL_with_snapshot(snapshot, sql, params)

def __enter__(self):
return self
Expand Down
23 changes: 23 additions & 0 deletions tests/system/test_dbapi.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,11 @@

from google.cloud import spanner_v1
from google.cloud.spanner_dbapi.connection import connect, Connection
from google.cloud.spanner_dbapi.exceptions import ProgrammingError
from google.cloud.spanner_v1 import JsonObject
from . import _helpers


DATABASE_NAME = "dbapi-txn"

DDL_STATEMENTS = (
Expand Down Expand Up @@ -406,3 +408,24 @@ def test_user_agent(shared_instance, dbapi_database):
conn.instance._client._client_info.user_agent
== "dbapi/" + pkg_resources.get_distribution("google-cloud-spanner").version
)


def test_read_only(shared_instance, dbapi_database):
"""
Check that connection set to `read_only=True` uses
ReadOnly transactions.
"""
conn = Connection(shared_instance, dbapi_database, read_only=True)
cur = conn.cursor()

with pytest.raises(ProgrammingError):
cur.execute(
"""
UPDATE contacts
SET first_name = 'updated-first-name'
WHERE first_name = 'first-name'
"""
)

cur.execute("SELECT * FROM contacts")
conn.commit()
Loading

0 comments on commit cd3b950

Please sign in to comment.