From a5a680cf40f2672dd07f20f8d83f787bbfac0bb5 Mon Sep 17 00:00:00 2001 From: larkee Date: Tue, 5 Jan 2021 17:06:54 +1100 Subject: [PATCH 1/8] feat: add support for logging commit stats --- google/cloud/spanner_v1/batch.py | 22 +++-- google/cloud/spanner_v1/database.py | 40 +++++++- google/cloud/spanner_v1/instance.py | 6 +- google/cloud/spanner_v1/session.py | 8 +- google/cloud/spanner_v1/transaction.py | 23 +++-- tests/unit/test_batch.py | 16 ++-- tests/unit/test_database.py | 88 ++++++++++++++++- tests/unit/test_instance.py | 6 +- tests/unit/test_session.py | 126 +++++++++++++++++++++---- tests/unit/test_transaction.py | 28 ++++-- 10 files changed, 307 insertions(+), 56 deletions(-) diff --git a/google/cloud/spanner_v1/batch.py b/google/cloud/spanner_v1/batch.py index 27cd3c8b58..c04fa6e5a4 100644 --- a/google/cloud/spanner_v1/batch.py +++ b/google/cloud/spanner_v1/batch.py @@ -14,6 +14,7 @@ """Context manager for Cloud Spanner batched writes.""" +from google.cloud.spanner_v1 import CommitRequest from google.cloud.spanner_v1 import Mutation from google.cloud.spanner_v1 import TransactionOptions @@ -123,6 +124,7 @@ class Batch(_BatchBase): """ committed = None + commit_stats = None """Timestamp at which the batch was successfully committed.""" def _check_state(self): @@ -136,9 +138,13 @@ def _check_state(self): if self.committed is not None: raise ValueError("Batch already committed") - def commit(self): + def commit(self, return_commit_stats=False): """Commit mutations to the database. + :type return_commit_stats: bool + :param return_commit_stats: + If true, the response will return commit stats which can be accessed though commit_stats. + :rtype: datetime :returns: timestamp of the committed changes. """ @@ -148,14 +154,16 @@ def commit(self): metadata = _metadata_with_prefix(database.name) txn_options = TransactionOptions(read_write=TransactionOptions.ReadWrite()) trace_attributes = {"num_mutations": len(self._mutations)} + request = CommitRequest( + session=self._session.name, + mutations=self._mutations, + single_use_transaction=txn_options, + return_commit_stats=return_commit_stats, + ) with trace_call("CloudSpanner.Commit", self._session, trace_attributes): - response = api.commit( - session=self._session.name, - mutations=self._mutations, - single_use_transaction=txn_options, - metadata=metadata, - ) + response = api.commit(request=request, metadata=metadata,) self.committed = response.commit_timestamp + self.commit_stats = response.commit_stats return self.committed def __enter__(self): diff --git a/google/cloud/spanner_v1/database.py b/google/cloud/spanner_v1/database.py index c1c7953648..f7e4fb82c1 100644 --- a/google/cloud/spanner_v1/database.py +++ b/google/cloud/spanner_v1/database.py @@ -17,6 +17,7 @@ import copy import functools import grpc +import logging import re import threading @@ -99,7 +100,9 @@ class Database(object): _spanner_api = None - def __init__(self, database_id, instance, ddl_statements=(), pool=None): + def __init__( + self, database_id, instance, ddl_statements=(), pool=None, logger=None + ): self.database_id = database_id self._instance = instance self._ddl_statements = _check_ddl_statements(ddl_statements) @@ -107,6 +110,8 @@ def __init__(self, database_id, instance, ddl_statements=(), pool=None): self._state = None self._create_time = None self._restore_info = None + self.log_commit_stats = False + self._logger = logger if pool is None: pool = BurstyPool() @@ -216,6 +221,31 @@ def ddl_statements(self): """ return self._ddl_statements + @property + def logger(self): + """Logger used by the database. + + The default logger will log commit stats at the log level INFO using + `sys.stderr`. + + :rtype: :class:`logging.Logger` or `None` + :returns: the logger + """ + if self._logger is None: + self._logger = logging.getLogger(self.name) + self._logger.setLevel(logging.INFO) + + ch = logging.StreamHandler() + ch.setLevel(logging.INFO) + + formatter = logging.Formatter( + "%(asctime)s - %(name)s - %(levelname)s - %(message)s" + ) + ch.setFormatter(formatter) + + self._logger.addHandler(ch) + return self._logger + @property def spanner_api(self): """Helper for session-related API calls.""" @@ -624,8 +654,14 @@ def __exit__(self, exc_type, exc_val, exc_tb): """End ``with`` block.""" try: if exc_type is None: - self._batch.commit() + self._batch.commit(return_commit_stats=self._database.log_commit_stats) finally: + if self._database.log_commit_stats: + self._database.logger.info( + "Transaction mutation count: {}".format( + self._batch.commit_stats.mutation_count + ) + ) self._database._pool.put(self._session) diff --git a/google/cloud/spanner_v1/instance.py b/google/cloud/spanner_v1/instance.py index b422c57afd..4a3bd21b0d 100644 --- a/google/cloud/spanner_v1/instance.py +++ b/google/cloud/spanner_v1/instance.py @@ -357,7 +357,7 @@ def delete(self): api.delete_instance(name=self.name, metadata=metadata) - def database(self, database_id, ddl_statements=(), pool=None): + def database(self, database_id, ddl_statements=(), pool=None, logger=None): """Factory to create a database within this instance. :type database_id: str @@ -374,7 +374,9 @@ def database(self, database_id, ddl_statements=(), pool=None): :rtype: :class:`~google.cloud.spanner_v1.database.Database` :returns: a database owned by this instance. """ - return Database(database_id, self, ddl_statements=ddl_statements, pool=pool) + return Database( + database_id, self, ddl_statements=ddl_statements, pool=pool, logger=logger + ) def list_databases(self, page_size=None): """List databases for the instance. diff --git a/google/cloud/spanner_v1/session.py b/google/cloud/spanner_v1/session.py index 8b33221cf9..b87a13274b 100644 --- a/google/cloud/spanner_v1/session.py +++ b/google/cloud/spanner_v1/session.py @@ -349,7 +349,7 @@ def run_in_transaction(self, func, *args, **kw): raise try: - txn.commit() + txn.commit(return_commit_stats=self._database.log_commit_stats) except Aborted as exc: del self._transaction _delay_until_retry(exc, deadline, attempts) @@ -357,6 +357,12 @@ def run_in_transaction(self, func, *args, **kw): del self._transaction raise else: + if self._database.log_commit_stats: + self._database.logger.info( + "Transaction mutation count: {}".format( + txn.commit_stats.mutation_count + ) + ) return return_value diff --git a/google/cloud/spanner_v1/transaction.py b/google/cloud/spanner_v1/transaction.py index 51d5826f41..aa2353206f 100644 --- a/google/cloud/spanner_v1/transaction.py +++ b/google/cloud/spanner_v1/transaction.py @@ -21,6 +21,7 @@ _merge_query_options, _metadata_with_prefix, ) +from google.cloud.spanner_v1 import CommitRequest from google.cloud.spanner_v1 import ExecuteBatchDmlRequest from google.cloud.spanner_v1 import ExecuteSqlRequest from google.cloud.spanner_v1 import TransactionSelector @@ -42,6 +43,7 @@ class Transaction(_SnapshotBase, _BatchBase): committed = None """Timestamp at which the transaction was successfully committed.""" rolled_back = False + commit_stats = None _multi_use = True _execute_sql_count = 0 @@ -119,9 +121,13 @@ def rollback(self): self.rolled_back = True del self._session._transaction - def commit(self): + def commit(self, return_commit_stats=False): """Commit mutations to the database. + :type return_commit_stats: bool + :param return_commit_stats: + If true, the response will return commit stats which can be accessed though commit_stats. + :rtype: datetime :returns: timestamp of the committed changes. :raises ValueError: if there are no mutations to commit. @@ -132,14 +138,17 @@ def commit(self): api = database.spanner_api metadata = _metadata_with_prefix(database.name) trace_attributes = {"num_mutations": len(self._mutations)} + request = CommitRequest( + session=self._session.name, + mutations=self._mutations, + transaction_id=self._transaction_id, + return_commit_stats=return_commit_stats, + ) with trace_call("CloudSpanner.Commit", self._session, trace_attributes): - response = api.commit( - session=self._session.name, - mutations=self._mutations, - transaction_id=self._transaction_id, - metadata=metadata, - ) + response = api.commit(request=request, metadata=metadata,) self.committed = response.commit_timestamp + if return_commit_stats: + self.commit_stats = response.commit_stats del self._session._transaction return self.committed diff --git a/tests/unit/test_batch.py b/tests/unit/test_batch.py index 7c87f8a82a..187d44913f 100644 --- a/tests/unit/test_batch.py +++ b/tests/unit/test_batch.py @@ -339,17 +339,17 @@ def __init__(self, **kwargs): self.__dict__.update(**kwargs) def commit( - self, - session, - mutations, - transaction_id="", - single_use_transaction=None, - metadata=None, + self, request=None, metadata=None, ): from google.api_core.exceptions import Unknown - assert transaction_id == "" - self._committed = (session, mutations, single_use_transaction, metadata) + assert request.transaction_id == b"" + self._committed = ( + request.session, + request.mutations, + request.single_use_transaction, + metadata, + ) if self._rpc_error: raise Unknown("error") return self._commit_response diff --git a/tests/unit/test_database.py b/tests/unit/test_database.py index 175c269d50..f56621efd4 100644 --- a/tests/unit/test_database.py +++ b/tests/unit/test_database.py @@ -104,6 +104,8 @@ def test_ctor_defaults(self): self.assertIs(database._instance, instance) self.assertEqual(list(database.ddl_statements), []) self.assertIsInstance(database._pool, BurstyPool) + self.assertFalse(database.log_commit_stats) + self.assertIsNone(database._logger) # BurstyPool does not create sessions during 'bind()'. self.assertTrue(database._pool._sessions.empty()) @@ -145,6 +147,18 @@ def test_ctor_w_ddl_statements_ok(self): self.assertIs(database._instance, instance) self.assertEqual(list(database.ddl_statements), DDL_STATEMENTS) + def test_ctor_w_explicit_logger(self): + from logging import Logger + + instance = _Instance(self.INSTANCE_NAME) + logger = mock.create_autospec(Logger, instance=True) + database = self._make_one(self.DATABASE_ID, instance, logger=logger) + self.assertEqual(database.database_id, self.DATABASE_ID) + self.assertIs(database._instance, instance) + self.assertEqual(list(database.ddl_statements), []) + self.assertFalse(database.log_commit_stats) + self.assertEqual(database._logger, logger) + def test_from_pb_bad_database_name(self): from google.cloud.spanner_admin_database_v1 import Database @@ -249,6 +263,24 @@ def test_restore_info(self): ) self.assertEqual(database.restore_info, restore_info) + def test_logger_property_default(self): + import logging + + instance = _Instance(self.INSTANCE_NAME) + pool = _Pool() + database = self._make_one(self.DATABASE_ID, instance, pool=pool) + logger = logging.getLogger(database.name) + self.assertEqual(database.logger, logger) + + def test_logger_property_custom(self): + import logging + + instance = _Instance(self.INSTANCE_NAME) + pool = _Pool() + database = self._make_one(self.DATABASE_ID, instance, pool=pool) + logger = database._logger = mock.create_autospec(logging.Logger, instance=True) + self.assertEqual(database.logger, logger) + def test_spanner_api_property_w_scopeless_creds(self): client = _Client() @@ -1263,6 +1295,7 @@ def test_ctor(self): def test_context_mgr_success(self): import datetime + from google.cloud.spanner_v1 import CommitRequest from google.cloud.spanner_v1 import CommitResponse from google.cloud.spanner_v1 import TransactionOptions from google.cloud._helpers import UTC @@ -1290,11 +1323,60 @@ def test_context_mgr_success(self): expected_txn_options = TransactionOptions(read_write={}) + request = CommitRequest( + session=self.SESSION_NAME, + mutations=[], + single_use_transaction=expected_txn_options, + ) api.commit.assert_called_once_with( + request=request, metadata=[("google-cloud-resource-prefix", database.name)], + ) + + def test_context_mgr_w_commit_stats(self): + import datetime + from google.cloud.spanner_v1 import CommitRequest + from google.cloud.spanner_v1 import CommitResponse + from google.cloud.spanner_v1 import TransactionOptions + from google.cloud._helpers import UTC + from google.cloud._helpers import _datetime_to_pb_timestamp + from google.cloud.spanner_v1.batch import Batch + + now = datetime.datetime.utcnow().replace(tzinfo=UTC) + now_pb = _datetime_to_pb_timestamp(now) + commit_stats = CommitResponse.CommitStats(mutation_count=4) + response = CommitResponse(commit_timestamp=now_pb, commit_stats=commit_stats) + database = _Database(self.DATABASE_NAME) + database.log_commit_stats = True + api = database.spanner_api = self._make_spanner_client() + api.commit.return_value = response + pool = database._pool = _Pool() + session = _Session(database) + pool.put(session) + checkout = self._make_one(database) + + with checkout as batch: + self.assertIsNone(pool._session) + self.assertIsInstance(batch, Batch) + self.assertIs(batch._session, session) + + self.assertIs(pool._session, session) + self.assertEqual(batch.committed, now) + + expected_txn_options = TransactionOptions(read_write={}) + + request = CommitRequest( session=self.SESSION_NAME, mutations=[], single_use_transaction=expected_txn_options, - metadata=[("google-cloud-resource-prefix", database.name)], + return_commit_stats=True + + ) + api.commit.assert_called_once_with( + request=request, metadata=[("google-cloud-resource-prefix", database.name)], + ) + + database.logger.info.assert_called_once_with( + "Transaction mutation count: 4" ) def test_context_mgr_failure(self): @@ -1883,10 +1965,14 @@ def __init__(self, name): class _Database(object): + log_commit_stats = False + def __init__(self, name, instance=None): self.name = name self.database_id = name.rsplit("/", 1)[1] self._instance = instance + from logging import Logger + self.logger = mock.create_autospec(Logger, instance=True) class _Pool(object): diff --git a/tests/unit/test_instance.py b/tests/unit/test_instance.py index edd8249c67..c1d02c5728 100644 --- a/tests/unit/test_instance.py +++ b/tests/unit/test_instance.py @@ -484,10 +484,12 @@ def test_database_factory_defaults(self): self.assertIs(database._instance, instance) self.assertEqual(list(database.ddl_statements), []) self.assertIsInstance(database._pool, BurstyPool) + self.assertIsNone(database._logger) pool = database._pool self.assertIs(pool._database, database) def test_database_factory_explicit(self): + from logging import Logger from google.cloud.spanner_v1.database import Database from tests._fixtures import DDL_STATEMENTS @@ -495,9 +497,10 @@ def test_database_factory_explicit(self): instance = self._make_one(self.INSTANCE_ID, client, self.CONFIG_NAME) DATABASE_ID = "database-id" pool = _Pool() + logger = mock.create_autospec(Logger, instance=True) database = instance.database( - DATABASE_ID, ddl_statements=DDL_STATEMENTS, pool=pool + DATABASE_ID, ddl_statements=DDL_STATEMENTS, pool=pool, logger=logger ) self.assertIsInstance(database, Database) @@ -505,6 +508,7 @@ def test_database_factory_explicit(self): self.assertIs(database._instance, instance) self.assertEqual(list(database.ddl_statements), DDL_STATEMENTS) self.assertIs(database._pool, pool) + self.assertIs(database._logger, logger) self.assertIs(pool._bound, database) def test_list_databases(self): diff --git a/tests/unit/test_session.py b/tests/unit/test_session.py index 0a004e3cd0..d377196919 100644 --- a/tests/unit/test_session.py +++ b/tests/unit/test_session.py @@ -65,6 +65,7 @@ def _make_database(name=DATABASE_NAME): database = mock.create_autospec(Database, instance=True) database.name = name + database.log_commit_stats = False return database @staticmethod @@ -769,6 +770,7 @@ def unit_of_work(txn, *args, **kw): def test_run_in_transaction_w_args_w_kwargs_wo_abort(self): import datetime + from google.cloud.spanner_v1 import CommitRequest from google.cloud.spanner_v1 import CommitResponse from google.cloud.spanner_v1 import ( Transaction as TransactionPB, @@ -820,15 +822,18 @@ def unit_of_work(txn, *args, **kw): options=expected_options, metadata=[("google-cloud-resource-prefix", database.name)], ) - gax_api.commit.assert_called_once_with( + request = CommitRequest( session=self.SESSION_NAME, mutations=txn._mutations, transaction_id=TRANSACTION_ID, - metadata=[("google-cloud-resource-prefix", database.name)], + ) + gax_api.commit.assert_called_once_with( + request=request, metadata=[("google-cloud-resource-prefix", database.name)], ) def test_run_in_transaction_w_commit_error(self): from google.api_core.exceptions import Unknown + from google.cloud.spanner_v1 import CommitRequest from google.cloud.spanner_v1.transaction import Transaction TABLE_NAME = "citizens" @@ -867,16 +872,19 @@ def unit_of_work(txn, *args, **kw): self.assertEqual(kw, {}) gax_api.begin_transaction.assert_not_called() - gax_api.commit.assert_called_once_with( + request = CommitRequest( session=self.SESSION_NAME, mutations=txn._mutations, transaction_id=TRANSACTION_ID, - metadata=[("google-cloud-resource-prefix", database.name)], + ) + gax_api.commit.assert_called_once_with( + request=request, metadata=[("google-cloud-resource-prefix", database.name)], ) def test_run_in_transaction_w_abort_no_retry_metadata(self): import datetime from google.api_core.exceptions import Aborted + from google.cloud.spanner_v1 import CommitRequest from google.cloud.spanner_v1 import CommitResponse from google.cloud.spanner_v1 import ( Transaction as TransactionPB, @@ -934,13 +942,16 @@ def unit_of_work(txn, *args, **kw): ] * 2, ) + request = CommitRequest( + session=self.SESSION_NAME, + mutations=txn._mutations, + transaction_id=TRANSACTION_ID, + ) self.assertEqual( gax_api.commit.call_args_list, [ mock.call( - session=self.SESSION_NAME, - mutations=txn._mutations, - transaction_id=TRANSACTION_ID, + request=request, metadata=[("google-cloud-resource-prefix", database.name)], ) ] @@ -952,6 +963,7 @@ def test_run_in_transaction_w_abort_w_retry_metadata(self): from google.api_core.exceptions import Aborted from google.protobuf.duration_pb2 import Duration from google.rpc.error_details_pb2 import RetryInfo + from google.cloud.spanner_v1 import CommitRequest from google.cloud.spanner_v1 import CommitResponse from google.cloud.spanner_v1 import ( Transaction as TransactionPB, @@ -1022,13 +1034,16 @@ def unit_of_work(txn, *args, **kw): ] * 2, ) + request = CommitRequest( + session=self.SESSION_NAME, + mutations=txn._mutations, + transaction_id=TRANSACTION_ID, + ) self.assertEqual( gax_api.commit.call_args_list, [ mock.call( - session=self.SESSION_NAME, - mutations=txn._mutations, - transaction_id=TRANSACTION_ID, + request=request, metadata=[("google-cloud-resource-prefix", database.name)], ) ] @@ -1040,6 +1055,7 @@ def test_run_in_transaction_w_callback_raises_abort_wo_metadata(self): from google.api_core.exceptions import Aborted from google.protobuf.duration_pb2 import Duration from google.rpc.error_details_pb2 import RetryInfo + from google.cloud.spanner_v1 import CommitRequest from google.cloud.spanner_v1 import CommitResponse from google.cloud.spanner_v1 import ( Transaction as TransactionPB, @@ -1110,11 +1126,13 @@ def unit_of_work(txn, *args, **kw): ] * 2, ) - gax_api.commit.assert_called_once_with( + request = CommitRequest( session=self.SESSION_NAME, mutations=txn._mutations, transaction_id=TRANSACTION_ID, - metadata=[("google-cloud-resource-prefix", database.name)], + ) + gax_api.commit.assert_called_once_with( + request=request, metadata=[("google-cloud-resource-prefix", database.name)], ) def test_run_in_transaction_w_abort_w_retry_metadata_deadline(self): @@ -1122,6 +1140,7 @@ def test_run_in_transaction_w_abort_w_retry_metadata_deadline(self): from google.api_core.exceptions import Aborted from google.protobuf.duration_pb2 import Duration from google.rpc.error_details_pb2 import RetryInfo + from google.cloud.spanner_v1 import CommitRequest from google.cloud.spanner_v1 import CommitResponse from google.cloud.spanner_v1 import ( Transaction as TransactionPB, @@ -1197,15 +1216,18 @@ def _time(_results=[1, 1.5]): options=expected_options, metadata=[("google-cloud-resource-prefix", database.name)], ) - gax_api.commit.assert_called_once_with( + request = CommitRequest( session=self.SESSION_NAME, mutations=txn._mutations, transaction_id=TRANSACTION_ID, - metadata=[("google-cloud-resource-prefix", database.name)], + ) + gax_api.commit.assert_called_once_with( + request=request, metadata=[("google-cloud-resource-prefix", database.name)], ) def test_run_in_transaction_w_timeout(self): from google.api_core.exceptions import Aborted + from google.cloud.spanner_v1 import CommitRequest from google.cloud.spanner_v1 import ( Transaction as TransactionPB, TransactionOptions, @@ -1275,19 +1297,89 @@ def _time(_results=[1, 2, 4, 8]): ] * 3, ) + request = CommitRequest( + session=self.SESSION_NAME, + mutations=txn._mutations, + transaction_id=TRANSACTION_ID, + ) self.assertEqual( gax_api.commit.call_args_list, [ mock.call( - session=self.SESSION_NAME, - mutations=txn._mutations, - transaction_id=TRANSACTION_ID, + request=request, metadata=[("google-cloud-resource-prefix", database.name)], ) ] * 3, ) + def test_run_in_transaction_w_commit_stats(self): + import datetime + from google.cloud.spanner_v1 import CommitRequest + from google.cloud.spanner_v1 import CommitResponse + from google.cloud.spanner_v1 import ( + Transaction as TransactionPB, + TransactionOptions, + ) + from google.cloud._helpers import UTC + from google.cloud._helpers import _datetime_to_pb_timestamp + from google.cloud.spanner_v1.transaction import Transaction + + TABLE_NAME = "citizens" + COLUMNS = ["email", "first_name", "last_name", "age"] + VALUES = [ + ["phred@exammple.com", "Phred", "Phlyntstone", 32], + ["bharney@example.com", "Bharney", "Rhubble", 31], + ] + TRANSACTION_ID = b"FACEDACE" + transaction_pb = TransactionPB(id=TRANSACTION_ID) + now = datetime.datetime.utcnow().replace(tzinfo=UTC) + now_pb = _datetime_to_pb_timestamp(now) + commit_stats = CommitResponse.CommitStats(mutation_count=4) + response = CommitResponse(commit_timestamp=now_pb) + gax_api = self._make_spanner_api() + gax_api.begin_transaction.return_value = transaction_pb + gax_api.commit.return_value = response + database = self._make_database() + database.log_commit_stats = True + database.spanner_api = gax_api + session = self._make_one(database) + session._session_id = self.SESSION_ID + + called_with = [] + + def unit_of_work(txn, *args, **kw): + called_with.append((txn, args, kw)) + txn.insert(TABLE_NAME, COLUMNS, VALUES) + return 42 + + return_value = session.run_in_transaction(unit_of_work, "abc", some_arg="def") + + self.assertIsNone(session._transaction) + self.assertEqual(len(called_with), 1) + txn, args, kw = called_with[0] + self.assertIsInstance(txn, Transaction) + self.assertEqual(return_value, 42) + self.assertEqual(args, ("abc",)) + self.assertEqual(kw, {"some_arg": "def"}) + + expected_options = TransactionOptions(read_write=TransactionOptions.ReadWrite()) + gax_api.begin_transaction.assert_called_once_with( + session=self.SESSION_NAME, + options=expected_options, + metadata=[("google-cloud-resource-prefix", database.name)], + ) + request = CommitRequest( + session=self.SESSION_NAME, + mutations=txn._mutations, + transaction_id=TRANSACTION_ID, + return_commit_stats=True, + ) + gax_api.commit.assert_called_once_with( + request=request, metadata=[("google-cloud-resource-prefix", database.name)], + ) + database.logger.info.assert_called_once_with("Transaction mutation count: 4") + def test_delay_helper_w_no_delay(self): from google.cloud.spanner_v1.session import _delay_until_retry diff --git a/tests/unit/test_transaction.py b/tests/unit/test_transaction.py index 2c3b45a664..4dc56bfa06 100644 --- a/tests/unit/test_transaction.py +++ b/tests/unit/test_transaction.py @@ -309,7 +309,7 @@ def test_commit_w_other_error(self): attributes=dict(TestTransaction.BASE_ATTRIBUTES, num_mutations=1), ) - def _commit_helper(self, mutate=True): + def _commit_helper(self, mutate=True, return_commit_stats=False): import datetime from google.cloud.spanner_v1 import CommitResponse from google.cloud.spanner_v1.keyset import KeySet @@ -319,6 +319,8 @@ def _commit_helper(self, mutate=True): keys = [[0], [1], [2]] keyset = KeySet(keys=keys) response = CommitResponse(commit_timestamp=now) + if return_commit_stats: + response.commit_stats.mutation_count = 4 database = _Database() api = database.spanner_api = _FauxSpannerAPI(_commit_response=response) session = _Session(database) @@ -328,7 +330,7 @@ def _commit_helper(self, mutate=True): if mutate: transaction.delete(TABLE_NAME, keyset) - transaction.commit() + transaction.commit(return_commit_stats=return_commit_stats) self.assertEqual(transaction.committed, now) self.assertIsNone(session._transaction) @@ -339,6 +341,9 @@ def _commit_helper(self, mutate=True): self.assertEqual(mutations, transaction._mutations) self.assertEqual(metadata, [("google-cloud-resource-prefix", database.name)]) + if return_commit_stats: + self.assertEqual(transaction.commit_stats.mutation_count, 4) + self.assertSpanAttributes( "CloudSpanner.Commit", attributes=dict( @@ -353,6 +358,9 @@ def test_commit_no_mutations(self): def test_commit_w_mutations(self): self._commit_helper(mutate=True) + def test_commit_w_return_commit_stats(self): + self._commit_helper(return_commit_stats=True) + def test__make_params_pb_w_params_wo_param_types(self): session = _Session() transaction = self._make_one(session) @@ -719,13 +727,13 @@ def rollback(self, session=None, transaction_id=None, metadata=None): return self._rollback_response def commit( - self, - session=None, - mutations=None, - transaction_id="", - single_use_transaction=None, - metadata=None, + self, request=None, metadata=None, ): - assert single_use_transaction is None - self._committed = (session, mutations, transaction_id, metadata) + assert not request.single_use_transaction + self._committed = ( + request.session, + request.mutations, + request.transaction_id, + metadata, + ) return self._commit_response From d6e6db4a5cee0307d0abd8c8f2e549e9392080a5 Mon Sep 17 00:00:00 2001 From: larkee Date: Thu, 28 Jan 2021 18:40:14 +1100 Subject: [PATCH 2/8] test: add commit stats to CommitResponse --- tests/unit/test_session.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/unit/test_session.py b/tests/unit/test_session.py index d377196919..5135a27926 100644 --- a/tests/unit/test_session.py +++ b/tests/unit/test_session.py @@ -1336,7 +1336,7 @@ def test_run_in_transaction_w_commit_stats(self): now = datetime.datetime.utcnow().replace(tzinfo=UTC) now_pb = _datetime_to_pb_timestamp(now) commit_stats = CommitResponse.CommitStats(mutation_count=4) - response = CommitResponse(commit_timestamp=now_pb) + response = CommitResponse(commit_timestamp=now_pb, commit_stats=commit_stats) gax_api = self._make_spanner_api() gax_api.begin_transaction.return_value = transaction_pb gax_api.commit.return_value = response From 6b97f72e6059d5f18285f6d44af2e82e70cafed0 Mon Sep 17 00:00:00 2001 From: larkee Date: Thu, 28 Jan 2021 18:41:06 +1100 Subject: [PATCH 3/8] style: fix lint errors --- tests/unit/test_database.py | 8 +++----- 1 file changed, 3 insertions(+), 5 deletions(-) diff --git a/tests/unit/test_database.py b/tests/unit/test_database.py index f56621efd4..8dd1555cfe 100644 --- a/tests/unit/test_database.py +++ b/tests/unit/test_database.py @@ -1368,16 +1368,13 @@ def test_context_mgr_w_commit_stats(self): session=self.SESSION_NAME, mutations=[], single_use_transaction=expected_txn_options, - return_commit_stats=True - + return_commit_stats=True, ) api.commit.assert_called_once_with( request=request, metadata=[("google-cloud-resource-prefix", database.name)], ) - database.logger.info.assert_called_once_with( - "Transaction mutation count: 4" - ) + database.logger.info.assert_called_once_with("Transaction mutation count: 4") def test_context_mgr_failure(self): from google.cloud.spanner_v1.batch import Batch @@ -1972,6 +1969,7 @@ def __init__(self, name, instance=None): self.database_id = name.rsplit("/", 1)[1] self._instance = instance from logging import Logger + self.logger = mock.create_autospec(Logger, instance=True) From ca867a841fcc45e11f274a3700fbf5c9c3d3db3b Mon Sep 17 00:00:00 2001 From: larkee Date: Thu, 18 Feb 2021 23:55:02 +1100 Subject: [PATCH 4/8] refactor: remove log formatting --- google/cloud/spanner_v1/database.py | 12 +----------- google/cloud/spanner_v1/session.py | 6 +----- 2 files changed, 2 insertions(+), 16 deletions(-) diff --git a/google/cloud/spanner_v1/database.py b/google/cloud/spanner_v1/database.py index f7e4fb82c1..363b247842 100644 --- a/google/cloud/spanner_v1/database.py +++ b/google/cloud/spanner_v1/database.py @@ -237,12 +237,6 @@ def logger(self): ch = logging.StreamHandler() ch.setLevel(logging.INFO) - - formatter = logging.Formatter( - "%(asctime)s - %(name)s - %(levelname)s - %(message)s" - ) - ch.setFormatter(formatter) - self._logger.addHandler(ch) return self._logger @@ -657,11 +651,7 @@ def __exit__(self, exc_type, exc_val, exc_tb): self._batch.commit(return_commit_stats=self._database.log_commit_stats) finally: if self._database.log_commit_stats: - self._database.logger.info( - "Transaction mutation count: {}".format( - self._batch.commit_stats.mutation_count - ) - ) + self._database.logger.info(self._batch.commit_stats) self._database._pool.put(self._session) diff --git a/google/cloud/spanner_v1/session.py b/google/cloud/spanner_v1/session.py index b87a13274b..48f6c0e582 100644 --- a/google/cloud/spanner_v1/session.py +++ b/google/cloud/spanner_v1/session.py @@ -358,11 +358,7 @@ def run_in_transaction(self, func, *args, **kw): raise else: if self._database.log_commit_stats: - self._database.logger.info( - "Transaction mutation count: {}".format( - txn.commit_stats.mutation_count - ) - ) + self._database.logger.info(txn.commit_stats) return return_value From b2bf23847bd96705477e61570237d865cce64aeb Mon Sep 17 00:00:00 2001 From: larkee Date: Fri, 19 Feb 2021 01:02:31 +1100 Subject: [PATCH 5/8] test: update info arg assertions --- tests/unit/test_database.py | 2 +- tests/unit/test_session.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/tests/unit/test_database.py b/tests/unit/test_database.py index 8dd1555cfe..3192414478 100644 --- a/tests/unit/test_database.py +++ b/tests/unit/test_database.py @@ -1374,7 +1374,7 @@ def test_context_mgr_w_commit_stats(self): request=request, metadata=[("google-cloud-resource-prefix", database.name)], ) - database.logger.info.assert_called_once_with("Transaction mutation count: 4") + database.logger.info.assert_called_once_with(commit_stats) def test_context_mgr_failure(self): from google.cloud.spanner_v1.batch import Batch diff --git a/tests/unit/test_session.py b/tests/unit/test_session.py index 5135a27926..2c43228313 100644 --- a/tests/unit/test_session.py +++ b/tests/unit/test_session.py @@ -1378,7 +1378,7 @@ def unit_of_work(txn, *args, **kw): gax_api.commit.assert_called_once_with( request=request, metadata=[("google-cloud-resource-prefix", database.name)], ) - database.logger.info.assert_called_once_with("Transaction mutation count: 4") + database.logger.info.assert_called_once_with(commit_stats) def test_delay_helper_w_no_delay(self): from google.cloud.spanner_v1.session import _delay_until_retry From 507a4ecacb0fcae1c8c1a9cda8dc310f44f767c0 Mon Sep 17 00:00:00 2001 From: larkee Date: Fri, 19 Feb 2021 11:19:51 +1100 Subject: [PATCH 6/8] docs: document logger param --- google/cloud/spanner_v1/database.py | 6 ++++++ google/cloud/spanner_v1/instance.py | 6 ++++++ 2 files changed, 12 insertions(+) diff --git a/google/cloud/spanner_v1/database.py b/google/cloud/spanner_v1/database.py index 363b247842..d73cc4c4a4 100644 --- a/google/cloud/spanner_v1/database.py +++ b/google/cloud/spanner_v1/database.py @@ -96,6 +96,12 @@ class Database(object): :param pool: (Optional) session pool to be used by database. If not passed, the database will construct an instance of :class:`~google.cloud.spanner_v1.pool.BurstyPool`. + + :type logger: `logging.Logger` + :param logger: (Optional) a custom logger that is used if `log_commit_stats` + is `True` to log commit statistics. If not passed, a logger + will be created when needed that will log the commit statistics + to stdout. """ _spanner_api = None diff --git a/google/cloud/spanner_v1/instance.py b/google/cloud/spanner_v1/instance.py index 4a3bd21b0d..3ade778a28 100644 --- a/google/cloud/spanner_v1/instance.py +++ b/google/cloud/spanner_v1/instance.py @@ -371,6 +371,12 @@ def database(self, database_id, ddl_statements=(), pool=None, logger=None): :class:`~google.cloud.spanner_v1.pool.AbstractSessionPool`. :param pool: (Optional) session pool to be used by database. + :type logger: `logging.Logger` + :param logger: (Optional) a custom logger that is used if `log_commit_stats` + is `True` to log commit statistics. If not passed, a logger + will be created when needed that will log the commit statistics + to stdout. + :rtype: :class:`~google.cloud.spanner_v1.database.Database` :returns: a database owned by this instance. """ From 11f62bcde81aea209b4a667cce41aeb7746158a6 Mon Sep 17 00:00:00 2001 From: larkee Date: Fri, 19 Feb 2021 11:57:59 +1100 Subject: [PATCH 7/8] refactor: pass CommitStats via extra kwarg --- google/cloud/spanner_v1/database.py | 5 ++++- google/cloud/spanner_v1/session.py | 5 ++++- tests/unit/test_database.py | 4 +++- tests/unit/test_session.py | 4 +++- 4 files changed, 14 insertions(+), 4 deletions(-) diff --git a/google/cloud/spanner_v1/database.py b/google/cloud/spanner_v1/database.py index d73cc4c4a4..ce64055a22 100644 --- a/google/cloud/spanner_v1/database.py +++ b/google/cloud/spanner_v1/database.py @@ -657,7 +657,10 @@ def __exit__(self, exc_type, exc_val, exc_tb): self._batch.commit(return_commit_stats=self._database.log_commit_stats) finally: if self._database.log_commit_stats: - self._database.logger.info(self._batch.commit_stats) + self._database.logger.info( + "CommitStats: {}".format(self._batch.commit_stats), + extra={"commit_stats": self._batch.commit_stats}, + ) self._database._pool.put(self._session) diff --git a/google/cloud/spanner_v1/session.py b/google/cloud/spanner_v1/session.py index 48f6c0e582..52bb62f67a 100644 --- a/google/cloud/spanner_v1/session.py +++ b/google/cloud/spanner_v1/session.py @@ -358,7 +358,10 @@ def run_in_transaction(self, func, *args, **kw): raise else: if self._database.log_commit_stats: - self._database.logger.info(txn.commit_stats) + self._database.logger.info( + "CommitStats: {}".format(txn.commit_stats), + extra={"commit_stats": txn.commit_stats}, + ) return return_value diff --git a/tests/unit/test_database.py b/tests/unit/test_database.py index 3192414478..d4c8f9451e 100644 --- a/tests/unit/test_database.py +++ b/tests/unit/test_database.py @@ -1374,7 +1374,9 @@ def test_context_mgr_w_commit_stats(self): request=request, metadata=[("google-cloud-resource-prefix", database.name)], ) - database.logger.info.assert_called_once_with(commit_stats) + database.logger.info.assert_called_once_with( + "CommitStats: mutation_count: 4\n", extra={"commit_stats": commit_stats} + ) def test_context_mgr_failure(self): from google.cloud.spanner_v1.batch import Batch diff --git a/tests/unit/test_session.py b/tests/unit/test_session.py index 2c43228313..79e36f02ea 100644 --- a/tests/unit/test_session.py +++ b/tests/unit/test_session.py @@ -1378,7 +1378,9 @@ def unit_of_work(txn, *args, **kw): gax_api.commit.assert_called_once_with( request=request, metadata=[("google-cloud-resource-prefix", database.name)], ) - database.logger.info.assert_called_once_with(commit_stats) + database.logger.info.assert_called_once_with( + "CommitStats: mutation_count: 4\n", extra={"commit_stats": commit_stats} + ) def test_delay_helper_w_no_delay(self): from google.cloud.spanner_v1.session import _delay_until_retry From 3dae32a9ed0027a896197b588e589938cca4f53f Mon Sep 17 00:00:00 2001 From: larkee Date: Mon, 22 Feb 2021 16:51:43 +1100 Subject: [PATCH 8/8] fix: ensure logger is unused if commit fails --- google/cloud/spanner_v1/database.py | 2 +- google/cloud/spanner_v1/session.py | 2 +- tests/unit/test_database.py | 39 +++++++++++++++++- tests/unit/test_session.py | 62 ++++++++++++++++++++++++++++- 4 files changed, 101 insertions(+), 4 deletions(-) diff --git a/google/cloud/spanner_v1/database.py b/google/cloud/spanner_v1/database.py index ce64055a22..6f6100d48e 100644 --- a/google/cloud/spanner_v1/database.py +++ b/google/cloud/spanner_v1/database.py @@ -656,7 +656,7 @@ def __exit__(self, exc_type, exc_val, exc_tb): if exc_type is None: self._batch.commit(return_commit_stats=self._database.log_commit_stats) finally: - if self._database.log_commit_stats: + if self._database.log_commit_stats and self._batch.commit_stats: self._database.logger.info( "CommitStats: {}".format(self._batch.commit_stats), extra={"commit_stats": self._batch.commit_stats}, diff --git a/google/cloud/spanner_v1/session.py b/google/cloud/spanner_v1/session.py index 52bb62f67a..4bec436d7d 100644 --- a/google/cloud/spanner_v1/session.py +++ b/google/cloud/spanner_v1/session.py @@ -357,7 +357,7 @@ def run_in_transaction(self, func, *args, **kw): del self._transaction raise else: - if self._database.log_commit_stats: + if self._database.log_commit_stats and txn.commit_stats: self._database.logger.info( "CommitStats: {}".format(txn.commit_stats), extra={"commit_stats": txn.commit_stats}, diff --git a/tests/unit/test_database.py b/tests/unit/test_database.py index d4c8f9451e..39ce8ee9d5 100644 --- a/tests/unit/test_database.py +++ b/tests/unit/test_database.py @@ -1332,7 +1332,7 @@ def test_context_mgr_success(self): request=request, metadata=[("google-cloud-resource-prefix", database.name)], ) - def test_context_mgr_w_commit_stats(self): + def test_context_mgr_w_commit_stats_success(self): import datetime from google.cloud.spanner_v1 import CommitRequest from google.cloud.spanner_v1 import CommitResponse @@ -1378,6 +1378,43 @@ def test_context_mgr_w_commit_stats(self): "CommitStats: mutation_count: 4\n", extra={"commit_stats": commit_stats} ) + def test_context_mgr_w_commit_stats_error(self): + from google.api_core.exceptions import Unknown + from google.cloud.spanner_v1 import CommitRequest + from google.cloud.spanner_v1 import TransactionOptions + from google.cloud.spanner_v1.batch import Batch + + database = _Database(self.DATABASE_NAME) + database.log_commit_stats = True + api = database.spanner_api = self._make_spanner_client() + api.commit.side_effect = Unknown("testing") + pool = database._pool = _Pool() + session = _Session(database) + pool.put(session) + checkout = self._make_one(database) + + with self.assertRaises(Unknown): + with checkout as batch: + self.assertIsNone(pool._session) + self.assertIsInstance(batch, Batch) + self.assertIs(batch._session, session) + + self.assertIs(pool._session, session) + + expected_txn_options = TransactionOptions(read_write={}) + + request = CommitRequest( + session=self.SESSION_NAME, + mutations=[], + single_use_transaction=expected_txn_options, + return_commit_stats=True, + ) + api.commit.assert_called_once_with( + request=request, metadata=[("google-cloud-resource-prefix", database.name)], + ) + + database.logger.info.assert_not_called() + def test_context_mgr_failure(self): from google.cloud.spanner_v1.batch import Batch diff --git a/tests/unit/test_session.py b/tests/unit/test_session.py index 79e36f02ea..f80b360b96 100644 --- a/tests/unit/test_session.py +++ b/tests/unit/test_session.py @@ -1313,7 +1313,7 @@ def _time(_results=[1, 2, 4, 8]): * 3, ) - def test_run_in_transaction_w_commit_stats(self): + def test_run_in_transaction_w_commit_stats_success(self): import datetime from google.cloud.spanner_v1 import CommitRequest from google.cloud.spanner_v1 import CommitResponse @@ -1382,6 +1382,66 @@ def unit_of_work(txn, *args, **kw): "CommitStats: mutation_count: 4\n", extra={"commit_stats": commit_stats} ) + def test_run_in_transaction_w_commit_stats_error(self): + from google.api_core.exceptions import Unknown + from google.cloud.spanner_v1 import CommitRequest + from google.cloud.spanner_v1 import ( + Transaction as TransactionPB, + TransactionOptions, + ) + from google.cloud.spanner_v1.transaction import Transaction + + TABLE_NAME = "citizens" + COLUMNS = ["email", "first_name", "last_name", "age"] + VALUES = [ + ["phred@exammple.com", "Phred", "Phlyntstone", 32], + ["bharney@example.com", "Bharney", "Rhubble", 31], + ] + TRANSACTION_ID = b"FACEDACE" + transaction_pb = TransactionPB(id=TRANSACTION_ID) + gax_api = self._make_spanner_api() + gax_api.begin_transaction.return_value = transaction_pb + gax_api.commit.side_effect = Unknown("testing") + database = self._make_database() + database.log_commit_stats = True + database.spanner_api = gax_api + session = self._make_one(database) + session._session_id = self.SESSION_ID + + called_with = [] + + def unit_of_work(txn, *args, **kw): + called_with.append((txn, args, kw)) + txn.insert(TABLE_NAME, COLUMNS, VALUES) + return 42 + + with self.assertRaises(Unknown): + session.run_in_transaction(unit_of_work, "abc", some_arg="def") + + self.assertIsNone(session._transaction) + self.assertEqual(len(called_with), 1) + txn, args, kw = called_with[0] + self.assertIsInstance(txn, Transaction) + self.assertEqual(args, ("abc",)) + self.assertEqual(kw, {"some_arg": "def"}) + + expected_options = TransactionOptions(read_write=TransactionOptions.ReadWrite()) + gax_api.begin_transaction.assert_called_once_with( + session=self.SESSION_NAME, + options=expected_options, + metadata=[("google-cloud-resource-prefix", database.name)], + ) + request = CommitRequest( + session=self.SESSION_NAME, + mutations=txn._mutations, + transaction_id=TRANSACTION_ID, + return_commit_stats=True, + ) + gax_api.commit.assert_called_once_with( + request=request, metadata=[("google-cloud-resource-prefix", database.name)], + ) + database.logger.info.assert_not_called() + def test_delay_helper_w_no_delay(self): from google.cloud.spanner_v1.session import _delay_until_retry