diff --git a/README.md b/README.md index 0f741e55..ab3651b4 100644 --- a/README.md +++ b/README.md @@ -190,6 +190,45 @@ Note that execution options are applied lazily - on the `execute()` method call, ReadOnly/ReadWrite mode of a connection can't be changed while a transaction is in progress - first you must commit or rollback it. +### Stale reads +To use the Spanner [Stale Reads](https://cloud.google.com/spanner/docs/reads#perform-stale-read) with SQLAlchemy you can tweak the connection execution options with a wanted staleness value. For example: +```python +# maximum staleness +with engine.connect().execution_options( + read_only=True, + staleness={"max_staleness": datetime.timedelta(seconds=5)} +) as connection: + connection.execute(select(["*"], from_obj=table)).fetchall() +``` + +```python +# exact staleness +with engine.connect().execution_options( + read_only=True, + staleness={"exact_staleness": datetime.timedelta(seconds=5)} +) as connection: + connection.execute(select(["*"], from_obj=table)).fetchall() +``` + +```python +# min read timestamp +with engine.connect().execution_options( + read_only=True, + staleness={"min_read_timestamp": datetime.datetime(2021, 11, 17, 12, 55, 30)} +) as connection: + connection.execute(select(["*"], from_obj=table)).fetchall() +``` + +```python +# read timestamp +with engine.connect().execution_options( + read_only=True, + staleness={"read_timestamp": datetime.datetime(2021, 11, 17, 12, 55, 30)} +) as connection: + connection.execute(select(["*"], from_obj=table)).fetchall() +``` +Note that the set option will be dropped when the connection is returned back to the pool. + ### DDL and transactions DDL statements are executed outside the regular transactions mechanism, which means DDL statements will not be rolled back on normal transaction rollback. diff --git a/google/cloud/sqlalchemy_spanner/sqlalchemy_spanner.py b/google/cloud/sqlalchemy_spanner/sqlalchemy_spanner.py index d65fc0a9..118672e4 100644 --- a/google/cloud/sqlalchemy_spanner/sqlalchemy_spanner.py +++ b/google/cloud/sqlalchemy_spanner/sqlalchemy_spanner.py @@ -25,7 +25,9 @@ from sqlalchemy import ForeignKeyConstraint, types, util from sqlalchemy.engine.base import Engine from sqlalchemy.engine.default import DefaultDialect, DefaultExecutionContext +from sqlalchemy.event import listens_for from sqlalchemy.ext.compiler import compiles +from sqlalchemy.pool import Pool from sqlalchemy.sql.compiler import ( selectable, DDLCompiler, @@ -38,6 +40,13 @@ from google.cloud import spanner_dbapi from google.cloud.sqlalchemy_spanner._opentelemetry_tracing import trace_call + +@listens_for(Pool, "reset") +def reset_connection(dbapi_conn, connection_record): + """An event of returning a connection back to a pool.""" + dbapi_conn.connection.staleness = None + + # Spanner-to-SQLAlchemy types map _type_map = { "BOOL": types.Boolean, @@ -128,6 +137,10 @@ def pre_exec(self): if read_only is not None: self._dbapi_connection.connection.read_only = read_only + staleness = self.execution_options.get("staleness", None) + if staleness is not None: + self._dbapi_connection.connection.staleness = staleness + class SpannerIdentifierPreparer(IdentifierPreparer): """Identifiers compiler. diff --git a/test/test_suite.py b/test/test_suite.py index ea40bf09..74c5d447 100644 --- a/test/test_suite.py +++ b/test/test_suite.py @@ -14,7 +14,7 @@ # See the License for the specific language governing permissions and # limitations under the License. -from datetime import timezone +import datetime import decimal import operator import os @@ -975,7 +975,9 @@ def test_row_w_scalar_select(self): eq_( row["somelabel"], - DatetimeWithNanoseconds(2006, 5, 12, 12, 0, 0, tzinfo=timezone.utc), + DatetimeWithNanoseconds( + 2006, 5, 12, 12, 0, 0, tzinfo=datetime.timezone.utc + ), ) @@ -1578,7 +1580,7 @@ class ExecutionOptionsTest(fixtures.TestBase): """ def setUp(self): - self._engine = create_engine(get_db_url()) + self._engine = create_engine(get_db_url(), pool_size=1) self._metadata = MetaData(bind=self._engine) self._table = Table( @@ -1594,3 +1596,15 @@ def test_read_only(self): with self._engine.connect().execution_options(read_only=True) as connection: connection.execute(select(["*"], from_obj=self._table)).fetchall() assert connection.connection.read_only is True + + def test_staleness(self): + with self._engine.connect().execution_options( + read_only=True, staleness={"max_staleness": datetime.timedelta(seconds=5)} + ) as connection: + connection.execute(select(["*"], from_obj=self._table)).fetchall() + assert connection.connection.staleness == { + "max_staleness": datetime.timedelta(seconds=5) + } + + with self._engine.connect() as connection: + assert connection.connection.staleness is None