Skip to content

Commit

Permalink
feat: support stale reads (#146)
Browse files Browse the repository at this point in the history
  • Loading branch information
Ilya Gurov committed Nov 19, 2021
1 parent 5253233 commit d80cb27
Show file tree
Hide file tree
Showing 3 changed files with 69 additions and 3 deletions.
39 changes: 39 additions & 0 deletions README.md
Expand Up @@ -190,6 +190,45 @@ Note that execution options are applied lazily - on the `execute()` method call,

ReadOnly/ReadWrite mode of a connection can't be changed while a transaction is in progress - first you must commit or rollback it.

### Stale reads
To use the Spanner [Stale Reads](https://cloud.google.com/spanner/docs/reads#perform-stale-read) with SQLAlchemy you can tweak the connection execution options with a wanted staleness value. For example:
```python
# maximum staleness
with engine.connect().execution_options(
read_only=True,
staleness={"max_staleness": datetime.timedelta(seconds=5)}
) as connection:
connection.execute(select(["*"], from_obj=table)).fetchall()
```

```python
# exact staleness
with engine.connect().execution_options(
read_only=True,
staleness={"exact_staleness": datetime.timedelta(seconds=5)}
) as connection:
connection.execute(select(["*"], from_obj=table)).fetchall()
```

```python
# min read timestamp
with engine.connect().execution_options(
read_only=True,
staleness={"min_read_timestamp": datetime.datetime(2021, 11, 17, 12, 55, 30)}
) as connection:
connection.execute(select(["*"], from_obj=table)).fetchall()
```

```python
# read timestamp
with engine.connect().execution_options(
read_only=True,
staleness={"read_timestamp": datetime.datetime(2021, 11, 17, 12, 55, 30)}
) as connection:
connection.execute(select(["*"], from_obj=table)).fetchall()
```
Note that the set option will be dropped when the connection is returned back to the pool.

### DDL and transactions
DDL statements are executed outside the regular transactions mechanism, which means DDL statements will not be rolled back on normal transaction rollback.

Expand Down
13 changes: 13 additions & 0 deletions google/cloud/sqlalchemy_spanner/sqlalchemy_spanner.py
Expand Up @@ -25,7 +25,9 @@
from sqlalchemy import ForeignKeyConstraint, types, util
from sqlalchemy.engine.base import Engine
from sqlalchemy.engine.default import DefaultDialect, DefaultExecutionContext
from sqlalchemy.event import listens_for
from sqlalchemy.ext.compiler import compiles
from sqlalchemy.pool import Pool
from sqlalchemy.sql.compiler import (
selectable,
DDLCompiler,
Expand All @@ -38,6 +40,13 @@
from google.cloud import spanner_dbapi
from google.cloud.sqlalchemy_spanner._opentelemetry_tracing import trace_call


@listens_for(Pool, "reset")
def reset_connection(dbapi_conn, connection_record):
"""An event of returning a connection back to a pool."""
dbapi_conn.connection.staleness = None


# Spanner-to-SQLAlchemy types map
_type_map = {
"BOOL": types.Boolean,
Expand Down Expand Up @@ -128,6 +137,10 @@ def pre_exec(self):
if read_only is not None:
self._dbapi_connection.connection.read_only = read_only

staleness = self.execution_options.get("staleness", None)
if staleness is not None:
self._dbapi_connection.connection.staleness = staleness


class SpannerIdentifierPreparer(IdentifierPreparer):
"""Identifiers compiler.
Expand Down
20 changes: 17 additions & 3 deletions test/test_suite.py
Expand Up @@ -14,7 +14,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.

from datetime import timezone
import datetime
import decimal
import operator
import os
Expand Down Expand Up @@ -975,7 +975,9 @@ def test_row_w_scalar_select(self):

eq_(
row["somelabel"],
DatetimeWithNanoseconds(2006, 5, 12, 12, 0, 0, tzinfo=timezone.utc),
DatetimeWithNanoseconds(
2006, 5, 12, 12, 0, 0, tzinfo=datetime.timezone.utc
),
)


Expand Down Expand Up @@ -1578,7 +1580,7 @@ class ExecutionOptionsTest(fixtures.TestBase):
"""

def setUp(self):
self._engine = create_engine(get_db_url())
self._engine = create_engine(get_db_url(), pool_size=1)
self._metadata = MetaData(bind=self._engine)

self._table = Table(
Expand All @@ -1594,3 +1596,15 @@ def test_read_only(self):
with self._engine.connect().execution_options(read_only=True) as connection:
connection.execute(select(["*"], from_obj=self._table)).fetchall()
assert connection.connection.read_only is True

def test_staleness(self):
with self._engine.connect().execution_options(
read_only=True, staleness={"max_staleness": datetime.timedelta(seconds=5)}
) as connection:
connection.execute(select(["*"], from_obj=self._table)).fetchall()
assert connection.connection.staleness == {
"max_staleness": datetime.timedelta(seconds=5)
}

with self._engine.connect() as connection:
assert connection.connection.staleness is None

0 comments on commit d80cb27

Please sign in to comment.