From 348f8b373790388cf90175de0c9d410bcb2c44a4 Mon Sep 17 00:00:00 2001 From: Miguel Hernandez Date: Fri, 6 Aug 2021 09:02:38 -0500 Subject: [PATCH 1/6] [BEAM-11986] Spanner write metric --- model/pipeline/src/main/proto/metrics.proto | 8 ++++++-- .../io/gcp/experimental/spannerio.py | 18 ++++++++++++++++++ .../apache_beam/io/gcp/resource_identifiers.py | 10 ++++++++++ .../apache_beam/metrics/monitoring_infos.py | 8 ++++++++ 4 files changed, 42 insertions(+), 2 deletions(-) diff --git a/model/pipeline/src/main/proto/metrics.proto b/model/pipeline/src/main/proto/metrics.proto index eda2d2b053e7..b1e588eb25a5 100644 --- a/model/pipeline/src/main/proto/metrics.proto +++ b/model/pipeline/src/main/proto/metrics.proto @@ -413,8 +413,12 @@ message MonitoringInfo { BIGQUERY_TABLE = 13 [(label_props) = { name: "BIGQUERY_TABLE" }]; BIGQUERY_VIEW = 14 [(label_props) = { name: "BIGQUERY_VIEW" }]; BIGQUERY_QUERY_NAME = 15 [(label_props) = { name: "BIGQUERY_QUERY_NAME" }]; - GCS_BUCKET = 16 [(label_props) = { name: "GCS_BUCKET"}]; - GCS_PROJECT_ID = 17 [(label_props) = { name: "GCS_PROJECT_ID"}]; + GCS_BUCKET = 16 [(label_props) = { name: "GCS_BUCKET" }]; + GCS_PROJECT_ID = 17 [(label_props) = { name: "GCS_PROJECT_ID" }]; + SPANNER_PROJECT_ID = 18 [(label_props) = { name: "SPANNER_PROJECT_ID" }]; + SPANNER_DATABASE_ID = 19 [(label_props) = { name: "SPANNER_DATABASE_ID" }]; + SPANNER_TABLE_ID = 20 [(label_props) = { name: "SPANNER_TABLE_ID" }]; + SPANNER_QUERY_NAME = 21 [(label_props) = { name: "SPANNER_QUERY_NAME" }]; } // A set of key and value labels which define the scope of the metric. For diff --git a/sdks/python/apache_beam/io/gcp/experimental/spannerio.py b/sdks/python/apache_beam/io/gcp/experimental/spannerio.py index bb8c5b8aac8f..2ec040cec9f0 100644 --- a/sdks/python/apache_beam/io/gcp/experimental/spannerio.py +++ b/sdks/python/apache_beam/io/gcp/experimental/spannerio.py @@ -189,6 +189,10 @@ from apache_beam.typehints import with_output_types from apache_beam.utils.annotations import experimental +from apache_beam.internal.metrics.metric import ServiceCallMetric +from apache_beam.io.gcp import resource_identifiers +from apache_beam.metrics import monitoring_infos + try: from google.cloud.spanner import Client from google.cloud.spanner import KeySet @@ -1068,6 +1072,19 @@ def __init__(self, spanner_configuration): self._spanner_configuration = spanner_configuration self._db_instance = None self.batches = Metrics.counter(self.__class__, 'SpannerBatches') + table_id = '' + resource = resource_identifiers.SpannerTable( + spanner_configuration.project, spanner_configuration.database, table_id) + labels = { + monitoring_infos.SERVICE_LABEL: 'Spanner', + monitoring_infos.METHOD_LABEL: 'Write', + monitoring_infos.RESOURCE_LABEL: resource, + monitoring_infos.SPANNER_PROJECT_ID: spanner_configuration.project, + monitoring_infos.SPANNER_DATABASE_ID: spanner_configuration.database, + } + self.service_call_metric = ServiceCallMetric( + request_count_urn=monitoring_infos.API_REQUEST_COUNT_URN, + base_labels=labels) def setup(self): spanner_client = Client(self._spanner_configuration.project) @@ -1094,6 +1111,7 @@ def process(self, element): raise ValueError("Unknown operation action: %s" % m.operation) batch_func(**m.kwargs) + self.service_call_metric.call('ok') @with_input_types(typing.Union[MutationGroup, _Mutator]) diff --git a/sdks/python/apache_beam/io/gcp/resource_identifiers.py b/sdks/python/apache_beam/io/gcp/resource_identifiers.py index a85b0696f823..7e1bec3d9822 100644 --- a/sdks/python/apache_beam/io/gcp/resource_identifiers.py +++ b/sdks/python/apache_beam/io/gcp/resource_identifiers.py @@ -37,3 +37,13 @@ def BigQueryTable(project_id, dataset_id, table_id): def GoogleCloudStorageBucket(bucket_id): return '//storage.googleapis.com/buckets/%s' % bucket_id + + +def SpannerTable(project_id, database_id, table_id): + return '//spanner.googleapis.com/projects/%s/topics/%s/tables/%s' % ( + project_id, database_id, table_id) + + +def SpannerSqlQuery(project_id, query_name): + return '//spanner.googleapis.com/projects/%s/queries/%s' % ( + project_id, query_name) diff --git a/sdks/python/apache_beam/metrics/monitoring_infos.py b/sdks/python/apache_beam/metrics/monitoring_infos.py index 236ac8148380..9faef039ab9c 100644 --- a/sdks/python/apache_beam/metrics/monitoring_infos.py +++ b/sdks/python/apache_beam/metrics/monitoring_infos.py @@ -99,6 +99,14 @@ common_urns.monitoring_info_labels.GCS_PROJECT_ID.label_props.name) GCS_BUCKET_LABEL = ( common_urns.monitoring_info_labels.GCS_BUCKET.label_props.name) +SPANNER_PROJECT_ID = ( + common_urns.monitoring_info_labels.SPANNER_PROJECT_ID.label_props.name) +SPANNER_DATABASE_ID = ( + common_urns.monitoring_info_labels.SPANNER_DATABASE_ID.label_props.name) +SPANNER_TABLE_ID = ( + common_urns.monitoring_info_labels.SPANNER_TABLE_ID.label_props.name) +SPANNER_QUERY_NAME = ( + common_urns.monitoring_info_labels.SPANNER_QUERY_NAME.label_props.name) def extract_counter_value(monitoring_info_proto): From 4de15383c712aa82689967e4fc8bfcc493217d41 Mon Sep 17 00:00:00 2001 From: Miguel Hernandez Date: Fri, 13 Aug 2021 07:21:01 -0500 Subject: [PATCH 2/6] [BEAM-11986] Spanner read metric --- .../io/gcp/experimental/spannerio.py | 101 +++++++++++++++--- .../experimental/spannerio_read_it_test.py | 16 +++ 2 files changed, 105 insertions(+), 12 deletions(-) diff --git a/sdks/python/apache_beam/io/gcp/experimental/spannerio.py b/sdks/python/apache_beam/io/gcp/experimental/spannerio.py index 2ec040cec9f0..7bdde5ff813f 100644 --- a/sdks/python/apache_beam/io/gcp/experimental/spannerio.py +++ b/sdks/python/apache_beam/io/gcp/experimental/spannerio.py @@ -288,6 +288,8 @@ class _BeamSpannerConfiguration(namedtuple("_BeamSpannerConfiguration", ["project", "instance", "database", + "table", + "query_name" "credentials", "pool", "snapshot_read_timestamp", @@ -309,7 +311,7 @@ def snapshot_options(self): @with_input_types(ReadOperation, typing.Dict[typing.Any, typing.Any]) @with_output_types(typing.List[typing.Any]) class _NaiveSpannerReadDoFn(DoFn): - def __init__(self, spanner_configuration): + def __init__(self, spanner_configuration, table_metric, query_metric): """ A naive version of Spanner read which uses the transaction API of the cloud spanner. @@ -324,6 +326,8 @@ def __init__(self, spanner_configuration): self._spanner_configuration = spanner_configuration self._snapshot = None self._session = None + self._table_metric = table_metric + self._query_metric = query_metric def _get_session(self): if self._session is None: @@ -372,6 +376,15 @@ def process(self, element, spanner_transaction): for row in transaction_read(**element.kwargs): yield row + table_id = self._spanner_configuration.table + query_name = self._spanner_configuration.query_name + if element.is_sql: + self._query_metric(query_name) + elif element.is_table: + self._table_metric(table_id) + else: + pass + @with_input_types(ReadOperation) @with_output_types(typing.Dict[typing.Any, typing.Any]) @@ -525,8 +538,10 @@ class _ReadFromPartitionFn(DoFn): """ A DoFn to perform reads from the partition. """ - def __init__(self, spanner_configuration): + def __init__(self, spanner_configuration, table_metric, query_metric): self._spanner_configuration = spanner_configuration + self._table_metric = table_metric + self._query_metric = query_metric def setup(self): spanner_client = Client(self._spanner_configuration.project) @@ -552,6 +567,15 @@ def process(self, element): for row in read_action(element['partitions']): yield row + table_id = self._spanner_configuration.table + query_name = self._spanner_configuration.query_name + if element.get('is_sql'): + self._query_metric(query_name) + elif element.get('is_table'): + self._table_metric(table_id) + else: + pass + def teardown(self): if self._snapshot: self._snapshot.close() @@ -567,7 +591,8 @@ class ReadFromSpanner(PTransform): def __init__(self, project_id, instance_id, database_id, pool=None, read_timestamp=None, exact_staleness=None, credentials=None, sql=None, params=None, param_types=None, # with_query - table=None, columns=None, index="", keyset=None, # with_table + table=None, query_name=None, columns=None, index="", + keyset=None, # with_table read_operations=None, # for read all transaction=None ): @@ -615,6 +640,8 @@ def __init__(self, project_id, instance_id, database_id, pool=None, project=project_id, instance=instance_id, database=database_id, + table=table, + query_name=query_name, credentials=credentials, pool=pool, snapshot_read_timestamp=read_timestamp, @@ -622,6 +649,12 @@ def __init__(self, project_id, instance_id, database_id, pool=None, self._read_operations = read_operations self._transaction = transaction + self.base_labels = { + monitoring_infos.SERVICE_LABEL: 'Spanner', + monitoring_infos.METHOD_LABEL: 'Read', + monitoring_infos.SPANNER_PROJECT_ID: self._configuration.project, + monitoring_infos.SPANNER_DATABASE_ID: self._configuration.database, + } if self._read_operations is None: if table is not None: @@ -637,6 +670,34 @@ def __init__(self, project_id, instance_id, database_id, pool=None, sql=sql, params=params, param_types=param_types) ] + def _table_metric(self, table_id): + database_id = self._configuration.database + project_id = self._configuration.project + resource = resource_identifiers.SpannerTable( + project_id, database_id, table_id) + labels = { + **self.base_labels, + monitoring_infos.RESOURCE_LABEL: resource, + monitoring_infos.SPANNER_TABLE_ID: table_id + } + service_call_metric = ServiceCallMetric( + request_count_urn=monitoring_infos.API_REQUEST_COUNT_URN, + base_labels=labels) + service_call_metric.call('ok') + + def _query_metric(self, query_name): + project_id = self._configuration.project + resource = resource_identifiers.SpannerSqlQuery(project_id, query_name) + labels = { + **self.base_labels, + monitoring_infos.RESOURCE_LABEL: resource, + monitoring_infos.SPANNER_QUERY_NAME: query_name + } + service_call_metric = ServiceCallMetric( + request_count_urn=monitoring_infos.API_REQUEST_COUNT_URN, + base_labels=labels) + service_call_metric.call('ok') + def expand(self, pbegin): if self._read_operations is not None and isinstance(pbegin, PBegin): pcoll = pbegin.pipeline | Create(self._read_operations) @@ -660,7 +721,10 @@ def expand(self, pbegin): _CreateReadPartitions(spanner_configuration=self._configuration)) | 'Reshuffle' >> Reshuffle() | 'Read From Partitions' >> ParDo( - _ReadFromPartitionFn(spanner_configuration=self._configuration))) + _ReadFromPartitionFn( + spanner_configuration=self._configuration, + table_metric=self._table_metric, + query_metric=self._query_metric))) else: # reading as naive read, in which we don't make batches and execute the # queries as a single read. @@ -668,7 +732,10 @@ def expand(self, pbegin): pcoll | 'Reshuffle' >> Reshuffle().with_input_types(ReadOperation) | 'Perform Read' >> ParDo( - _NaiveSpannerReadDoFn(spanner_configuration=self._configuration), + _NaiveSpannerReadDoFn( + spanner_configuration=self._configuration, + table_metric=self._table_metric, + query_metric=self._query_metric), AsSingleton(self._transaction))) return p @@ -1072,19 +1139,28 @@ def __init__(self, spanner_configuration): self._spanner_configuration = spanner_configuration self._db_instance = None self.batches = Metrics.counter(self.__class__, 'SpannerBatches') - table_id = '' - resource = resource_identifiers.SpannerTable( - spanner_configuration.project, spanner_configuration.database, table_id) - labels = { + self.base_labels = { monitoring_infos.SERVICE_LABEL: 'Spanner', monitoring_infos.METHOD_LABEL: 'Write', - monitoring_infos.RESOURCE_LABEL: resource, monitoring_infos.SPANNER_PROJECT_ID: spanner_configuration.project, monitoring_infos.SPANNER_DATABASE_ID: spanner_configuration.database, } - self.service_call_metric = ServiceCallMetric( + + def table_write_service_call_metric(self, table_id): + database_id = self._spanner_configuration.database + project_id = self._spanner_configuration.project + resource = resource_identifiers.SpannerTable( + project_id, database_id, table_id) + labels = { + **self.base_labels, + monitoring_infos.RESOURCE_LABEL: resource, + monitoring_infos.SPANNER_TABLE_ID: table_id + } + + service_call_metric = ServiceCallMetric( request_count_urn=monitoring_infos.API_REQUEST_COUNT_URN, base_labels=labels) + service_call_metric.call('ok') def setup(self): spanner_client = Client(self._spanner_configuration.project) @@ -1097,6 +1173,7 @@ def process(self, element): self.batches.inc() with self._db_instance.batch() as b: for m in element: + table_id = m.kwargs['table'] if m.operation == WriteMutation._OPERATION_DELETE: batch_func = b.delete elif m.operation == WriteMutation._OPERATION_REPLACE: @@ -1111,7 +1188,7 @@ def process(self, element): raise ValueError("Unknown operation action: %s" % m.operation) batch_func(**m.kwargs) - self.service_call_metric.call('ok') + self.table_write_service_call_metric(table_id) @with_input_types(typing.Union[MutationGroup, _Mutator]) diff --git a/sdks/python/apache_beam/io/gcp/experimental/spannerio_read_it_test.py b/sdks/python/apache_beam/io/gcp/experimental/spannerio_read_it_test.py index 0272dac4e772..fd92777b7979 100644 --- a/sdks/python/apache_beam/io/gcp/experimental/spannerio_read_it_test.py +++ b/sdks/python/apache_beam/io/gcp/experimental/spannerio_read_it_test.py @@ -124,6 +124,22 @@ def test_read_via_sql(self): sql="select * from Users") assert_that(r, equal_to(self._data)) + @pytest.mark.it_postcommit + def test_transaction_read_via_table(self): + _LOGGER.info("Spanner Read via table") + with beam.Pipeline(argv=self.args) as p: + transaction = ( + p + | create_transaction(self.project, self.instance, self.TEST_DATABASE)) + r = p | ReadFromSpanner( + self.project, + self.instance, + self.TEST_DATABASE, + table="Users", + columns=["UserId", "Key"], + transaction=transaction) + assert_that(r, equal_to(self._data)) + @classmethod def tearDownClass(cls): # drop the testing database after the tests From ceebe853ebdd4cffb9185dccd9f726666ca00922 Mon Sep 17 00:00:00 2001 From: Miguel Hernandez Date: Mon, 6 Sep 2021 08:25:10 -0500 Subject: [PATCH 3/6] [BEAM-11986] SpannerIO metrics integration tests --- .../io/gcp/experimental/spannerio.py | 118 +++++++++------ .../experimental/spannerio_read_it_test.py | 136 +++++++++++++++++- .../io/gcp/experimental/spannerio_test.py | 55 +++++++ .../experimental/spannerio_write_it_test.py | 94 ++++++++++++ 4 files changed, 356 insertions(+), 47 deletions(-) diff --git a/sdks/python/apache_beam/io/gcp/experimental/spannerio.py b/sdks/python/apache_beam/io/gcp/experimental/spannerio.py index 7bdde5ff813f..0969b2f6156f 100644 --- a/sdks/python/apache_beam/io/gcp/experimental/spannerio.py +++ b/sdks/python/apache_beam/io/gcp/experimental/spannerio.py @@ -199,6 +199,8 @@ from google.cloud.spanner_v1 import batch from google.cloud.spanner_v1.database import BatchSnapshot from google.cloud.spanner_v1.proto.mutation_pb2 import Mutation + from google.api_core.exceptions import ClientError, GoogleAPICallError + from apitools.base.py.exceptions import HttpError except ImportError: Client = None KeySet = None @@ -289,7 +291,7 @@ class _BeamSpannerConfiguration(namedtuple("_BeamSpannerConfiguration", "instance", "database", "table", - "query_name" + "query_name", "credentials", "pool", "snapshot_read_timestamp", @@ -365,25 +367,32 @@ def process(self, element, spanner_transaction): # getting the transaction from the snapshot's session to run read operation. # with self._snapshot.session().transaction() as transaction: with self._get_session().transaction() as transaction: + table_id = self._spanner_configuration.table + query_name = self._spanner_configuration.query_name + if element.is_sql is True: transaction_read = transaction.execute_sql + metric_action = self._query_metric + metric_id = query_name elif element.is_table is True: transaction_read = transaction.read + metric_action = self._table_metric + metric_id = table_id else: raise ValueError( "ReadOperation is improperly configure: %s" % str(element)) - for row in transaction_read(**element.kwargs): - yield row + try: + for row in transaction_read(**element.kwargs): + yield row - table_id = self._spanner_configuration.table - query_name = self._spanner_configuration.query_name - if element.is_sql: - self._query_metric(query_name) - elif element.is_table: - self._table_metric(table_id) - else: - pass + metric_action(metric_id, 'ok') + except (ClientError, GoogleAPICallError) as e: + metric_action(metric_id, e.code.value) + raise + except HttpError as e: + metric_action(metric_id, e) + raise @with_input_types(ReadOperation) @@ -556,25 +565,33 @@ def process(self, element): self._snapshot = BatchSnapshot.from_dict( self._database, element['transaction_info']) + table_id = self._spanner_configuration.table + query_name = self._spanner_configuration.query_name + if element['is_sql'] is True: read_action = self._snapshot.process_query_batch + metric_action = self._query_metric + metric_id = query_name elif element['is_table'] is True: read_action = self._snapshot.process_read_batch + metric_action = self._table_metric + metric_id = table_id else: raise ValueError( "ReadOperation is improperly configure: %s" % str(element)) - for row in read_action(element['partitions']): - yield row + try: + for row in read_action(element['partitions']): + yield row - table_id = self._spanner_configuration.table - query_name = self._spanner_configuration.query_name - if element.get('is_sql'): - self._query_metric(query_name) - elif element.get('is_table'): - self._table_metric(table_id) - else: - pass + metric_action(metric_id, 'ok') + + except (ClientError, GoogleAPICallError) as e: + metric_action(metric_id, e.code.value) + raise + except HttpError as e: + metric_action(metric_id, e) + raise def teardown(self): if self._snapshot: @@ -670,7 +687,7 @@ def __init__(self, project_id, instance_id, database_id, pool=None, sql=sql, params=params, param_types=param_types) ] - def _table_metric(self, table_id): + def _table_metric(self, table_id, status): database_id = self._configuration.database project_id = self._configuration.project resource = resource_identifiers.SpannerTable( @@ -683,9 +700,9 @@ def _table_metric(self, table_id): service_call_metric = ServiceCallMetric( request_count_urn=monitoring_infos.API_REQUEST_COUNT_URN, base_labels=labels) - service_call_metric.call('ok') + service_call_metric.call(str(status)) - def _query_metric(self, query_name): + def _query_metric(self, query_name, status): project_id = self._configuration.project resource = resource_identifiers.SpannerSqlQuery(project_id, query_name) labels = { @@ -696,7 +713,7 @@ def _query_metric(self, query_name): service_call_metric = ServiceCallMetric( request_count_urn=monitoring_infos.API_REQUEST_COUNT_URN, base_labels=labels) - service_call_metric.call('ok') + service_call_metric.call(str(status)) def expand(self, pbegin): if self._read_operations is not None and isinstance(pbegin, PBegin): @@ -796,6 +813,8 @@ def __init__( project=project_id, instance=instance_id, database=database_id, + table=None, + query_name=None, credentials=credentials, pool=pool, snapshot_read_timestamp=None, @@ -1146,7 +1165,7 @@ def __init__(self, spanner_configuration): monitoring_infos.SPANNER_DATABASE_ID: spanner_configuration.database, } - def table_write_service_call_metric(self, table_id): + def _table_write_metric(self, table_id, status): database_id = self._spanner_configuration.database project_id = self._spanner_configuration.project resource = resource_identifiers.SpannerTable( @@ -1160,7 +1179,7 @@ def table_write_service_call_metric(self, table_id): service_call_metric = ServiceCallMetric( request_count_urn=monitoring_infos.API_REQUEST_COUNT_URN, base_labels=labels) - service_call_metric.call('ok') + service_call_metric.call(str(status)) def setup(self): spanner_client = Client(self._spanner_configuration.project) @@ -1171,24 +1190,33 @@ def setup(self): def process(self, element): self.batches.inc() - with self._db_instance.batch() as b: - for m in element: - table_id = m.kwargs['table'] - if m.operation == WriteMutation._OPERATION_DELETE: - batch_func = b.delete - elif m.operation == WriteMutation._OPERATION_REPLACE: - batch_func = b.replace - elif m.operation == WriteMutation._OPERATION_INSERT_OR_UPDATE: - batch_func = b.insert_or_update - elif m.operation == WriteMutation._OPERATION_INSERT: - batch_func = b.insert - elif m.operation == WriteMutation._OPERATION_UPDATE: - batch_func = b.update - else: - raise ValueError("Unknown operation action: %s" % m.operation) - - batch_func(**m.kwargs) - self.table_write_service_call_metric(table_id) + try: + with self._db_instance.batch() as b: + for m in element: + table_id = m.kwargs['table'] + if m.operation == WriteMutation._OPERATION_DELETE: + batch_func = b.delete + elif m.operation == WriteMutation._OPERATION_REPLACE: + batch_func = b.replace + elif m.operation == WriteMutation._OPERATION_INSERT_OR_UPDATE: + batch_func = b.insert_or_update + elif m.operation == WriteMutation._OPERATION_INSERT: + batch_func = b.insert + elif m.operation == WriteMutation._OPERATION_UPDATE: + batch_func = b.update + else: + raise ValueError("Unknown operation action: %s" % m.operation) + batch_func(**m.kwargs) + + self._table_write_metric(table_id, 'ok') + except (ClientError, GoogleAPICallError) as e: + self._table_write_metric(table_id, e.code.value) + print('GAPI Error') + raise + except HttpError as e: + self._table_write_metric(table_id, e) + print('HTTP Error') + raise @with_input_types(typing.Union[MutationGroup, _Mutator]) diff --git a/sdks/python/apache_beam/io/gcp/experimental/spannerio_read_it_test.py b/sdks/python/apache_beam/io/gcp/experimental/spannerio_read_it_test.py index fd92777b7979..93d08b9635b3 100644 --- a/sdks/python/apache_beam/io/gcp/experimental/spannerio_read_it_test.py +++ b/sdks/python/apache_beam/io/gcp/experimental/spannerio_read_it_test.py @@ -33,9 +33,13 @@ # pylint: disable=unused-import try: from google.cloud import spanner + from apache_beam.io.gcp import resource_identifiers from apache_beam.io.gcp.experimental.spannerio import create_transaction from apache_beam.io.gcp.experimental.spannerio import ReadOperation from apache_beam.io.gcp.experimental.spannerio import ReadFromSpanner + from apache_beam.metrics import monitoring_infos + from apache_beam.metrics.execution import MetricsEnvironment + from apache_beam.metrics.metricbase import MetricName except ImportError: spanner = None # pylint: enable=wrong-import-order, wrong-import-position, ungrouped-imports @@ -125,8 +129,20 @@ def test_read_via_sql(self): assert_that(r, equal_to(self._data)) @pytest.mark.it_postcommit - def test_transaction_read_via_table(self): - _LOGGER.info("Spanner Read via table") + def test_spanner_table_metrics_ok_call(self): + MetricsEnvironment.process_wide_container().reset() + resource = resource_identifiers.SpannerTable( + self.project, self.TEST_DATABASE, 'Users') + labels = { + monitoring_infos.SERVICE_LABEL: 'Spanner', + monitoring_infos.METHOD_LABEL: 'Read', + monitoring_infos.SPANNER_PROJECT_ID: self.project, + monitoring_infos.SPANNER_DATABASE_ID: self.TEST_DATABASE, + monitoring_infos.RESOURCE_LABEL: resource, + monitoring_infos.SPANNER_TABLE_ID: 'Users', + monitoring_infos.STATUS_LABEL: 'ok' + } + with beam.Pipeline(argv=self.args) as p: transaction = ( p @@ -138,7 +154,123 @@ def test_transaction_read_via_table(self): table="Users", columns=["UserId", "Key"], transaction=transaction) + + metric_name = MetricName( + None, None, urn=monitoring_infos.API_REQUEST_COUNT_URN, labels=labels) + metric_value = MetricsEnvironment.process_wide_container().get_counter( + metric_name).get_cumulative() + + assert_that(r, equal_to(self._data)) + self.assertEqual(metric_value, 1) + + @pytest.mark.it_postcommit + def test_spanner_table_metrics_error_call(self): + MetricsEnvironment.process_wide_container().reset() + resource = resource_identifiers.SpannerTable( + self.project, self.TEST_DATABASE, 'INVALID_TABLE') + labels = { + monitoring_infos.SERVICE_LABEL: 'Spanner', + monitoring_infos.METHOD_LABEL: 'Read', + monitoring_infos.SPANNER_PROJECT_ID: self.project, + monitoring_infos.SPANNER_DATABASE_ID: self.TEST_DATABASE, + monitoring_infos.RESOURCE_LABEL: resource, + monitoring_infos.SPANNER_TABLE_ID: 'INVALID_TABLE', + monitoring_infos.STATUS_LABEL: '404' + } + + with self.assertRaises(Exception): + p = beam.Pipeline(argv=self.args) + transaction = ( + p + | create_transaction(self.project, self.instance, self.TEST_DATABASE)) + _ = p | ReadFromSpanner( + self.project, + self.instance, + self.TEST_DATABASE, + table="INVALID_TABLE", + columns=["UserId", "Key"], + transaction=transaction) + + res = p.run() + res.wait_until_finish() + + metric_name = MetricName( + None, None, urn=monitoring_infos.API_REQUEST_COUNT_URN, labels=labels) + metric_value = MetricsEnvironment.process_wide_container().get_counter( + metric_name).get_cumulative() + + self.assertEqual(metric_value, 1) + + @pytest.mark.it_postcommit + def test_spanner_sql_metrics_ok_call(self): + MetricsEnvironment.process_wide_container().reset() + resource = resource_identifiers.SpannerSqlQuery(self.project, 'query-1') + labels = { + monitoring_infos.SERVICE_LABEL: 'Spanner', + monitoring_infos.METHOD_LABEL: 'Read', + monitoring_infos.SPANNER_PROJECT_ID: self.project, + monitoring_infos.SPANNER_DATABASE_ID: self.TEST_DATABASE, + monitoring_infos.RESOURCE_LABEL: resource, + monitoring_infos.SPANNER_QUERY_NAME: 'query-1', + monitoring_infos.STATUS_LABEL: 'ok' + } + + with beam.Pipeline(argv=self.args) as p: + transaction = ( + p + | create_transaction(self.project, self.instance, self.TEST_DATABASE)) + r = p | ReadFromSpanner( + self.project, + self.instance, + self.TEST_DATABASE, + sql="select * from Users", + query_name='query-1', + transaction=transaction) + + metric_name = MetricName( + None, None, urn=monitoring_infos.API_REQUEST_COUNT_URN, labels=labels) + metric_value = MetricsEnvironment.process_wide_container().get_counter( + metric_name).get_cumulative() + assert_that(r, equal_to(self._data)) + self.assertEqual(metric_value, 1) + + @pytest.mark.it_postcommit + def test_spanner_sql_metrics_error_call(self): + MetricsEnvironment.process_wide_container().reset() + resource = resource_identifiers.SpannerSqlQuery(self.project, 'query-2') + labels = { + monitoring_infos.SERVICE_LABEL: 'Spanner', + monitoring_infos.METHOD_LABEL: 'Read', + monitoring_infos.SPANNER_PROJECT_ID: self.project, + monitoring_infos.SPANNER_DATABASE_ID: self.TEST_DATABASE, + monitoring_infos.RESOURCE_LABEL: resource, + monitoring_infos.SPANNER_QUERY_NAME: 'query-2', + monitoring_infos.STATUS_LABEL: '400' + } + + with self.assertRaises(Exception): + p = beam.Pipeline(argv=self.args) + transaction = ( + p + | create_transaction(self.project, self.instance, self.TEST_DATABASE)) + _ = p | ReadFromSpanner( + self.project, + self.instance, + self.TEST_DATABASE, + sql="select * from NonExistent", + query_name="query-2", + transaction=transaction) + + res = p.run() + res.wait_until_finish() + + metric_name = MetricName( + None, None, urn=monitoring_infos.API_REQUEST_COUNT_URN, labels=labels) + metric_value = MetricsEnvironment.process_wide_container().get_counter( + metric_name).get_cumulative() + + self.assertEqual(metric_value, 1) @classmethod def tearDownClass(cls): diff --git a/sdks/python/apache_beam/io/gcp/experimental/spannerio_test.py b/sdks/python/apache_beam/io/gcp/experimental/spannerio_test.py index 4cdf294580bf..c132a6833ed6 100644 --- a/sdks/python/apache_beam/io/gcp/experimental/spannerio_test.py +++ b/sdks/python/apache_beam/io/gcp/experimental/spannerio_test.py @@ -41,6 +41,10 @@ from apache_beam.io.gcp.experimental.spannerio import MutationGroup from apache_beam.io.gcp.experimental.spannerio import WriteToSpanner from apache_beam.io.gcp.experimental.spannerio import _BatchFn + from apache_beam.io.gcp import resource_identifiers + from apache_beam.metrics import monitoring_infos + from apache_beam.metrics.execution import MetricsEnvironment + from apache_beam.metrics.metricbase import MetricName except ImportError: spanner = None # pylint: enable=wrong-import-order, wrong-import-position, ungrouped-imports @@ -375,6 +379,57 @@ def test_display_data(self, *args): self.assertTrue("table" in dd_transaction) self.assertTrue("transaction" in dd_transaction) + def test_read_monitoring_info( + self, mock_batch_snapshot_class, mock_client_class): + database = _generate_database_name() + resource = resource_identifiers.SpannerTable( + TEST_PROJECT_ID, database, 'users') + labels = { + monitoring_infos.SERVICE_LABEL: 'Spanner', + monitoring_infos.METHOD_LABEL: 'Read', + monitoring_infos.RESOURCE_LABEL: resource, + monitoring_infos.SPANNER_PROJECT_ID: TEST_PROJECT_ID, + monitoring_infos.SPANNER_DATABASE_ID: database, + monitoring_infos.SPANNER_TABLE_ID: 'users', + monitoring_infos.STATUS_LABEL: 'ok' + } + mock_snapshot = mock.MagicMock() + mock_snapshot.generate_read_batches.return_value = [{ + 'read': { + 'table': 'users', + 'keyset': { + 'all': True + }, + 'columns': ['Key', 'Value'], + 'index': '' + }, + 'partition': 'test_partition' + } for _ in range(3)] + mock_snapshot.process_read_batch.side_effect = [ + FAKE_ROWS[0:2], FAKE_ROWS[2:4], FAKE_ROWS[4:] + ] + + _ = [ReadOperation.table("users", ["Key", "Value"])] + pipeline = TestPipeline() + + _ = ( + pipeline + | 'read' >> ReadFromSpanner( + TEST_PROJECT_ID, + TEST_INSTANCE_ID, + database, + table="users", + columns=["Key", "Value"])) + + pipeline.run() + + metric_name = MetricName( + None, None, urn=monitoring_infos.API_REQUEST_COUNT_URN, labels=labels) + metric_value = MetricsEnvironment.process_wide_container().get_counter( + metric_name).get_cumulative() + + self.assertEqual(metric_value, 1) + @unittest.skipIf(spanner is None, 'GCP dependencies are not installed.') @mock.patch('apache_beam.io.gcp.experimental.spannerio.Client') diff --git a/sdks/python/apache_beam/io/gcp/experimental/spannerio_write_it_test.py b/sdks/python/apache_beam/io/gcp/experimental/spannerio_write_it_test.py index 7f2c8e30e3fb..ac6736a184be 100644 --- a/sdks/python/apache_beam/io/gcp/experimental/spannerio_write_it_test.py +++ b/sdks/python/apache_beam/io/gcp/experimental/spannerio_write_it_test.py @@ -31,9 +31,13 @@ try: from google.cloud import spanner from google.api_core.exceptions import NotFound + from apache_beam.io.gcp import resource_identifiers from apache_beam.io.gcp.experimental.spannerio import WriteMutation from apache_beam.io.gcp.experimental.spannerio import MutationGroup from apache_beam.io.gcp.experimental.spannerio import WriteToSpanner + from apache_beam.metrics import monitoring_infos + from apache_beam.metrics.execution import MetricsEnvironment + from apache_beam.metrics.metricbase import MetricName except ImportError: spanner = None # pylint: enable=wrong-import-order, wrong-import-position, ungrouped-imports @@ -67,6 +71,11 @@ def _create_database(cls): UserId STRING(256) NOT NULL, Key STRING(1024) ) PRIMARY KEY (UserId)''', + '''CREATE TABLE Albums ( + AlbumId STRING(256) NOT NULL, + Name STRING(1024) + ) PRIMARY KEY (AlbumId) + ''' ]) operation = database.create() _LOGGER.info('Creating database: Done! %s' % str(operation.result())) @@ -181,6 +190,91 @@ def test_spanner_error(self): database_id=self.TEST_DATABASE)) p.run() + @pytest.mark.it_postcommit + def test_spanner_metrics_ok_call(self): + MetricsEnvironment.process_wide_container().reset() + _prefix = 'test_write_batches' + mutations = [ + WriteMutation.insert( + 'Albums', ('AlbumId', 'Name'), + [(_prefix + '1', _prefix + 'inset-1')]), + WriteMutation.insert( + 'Albums', ('AlbumId', 'Name'), + [(_prefix + '2', _prefix + 'inset-2')]), + ] + + resource = resource_identifiers.SpannerTable( + self.project, self.TEST_DATABASE, 'Albums') + labels = { + monitoring_infos.SERVICE_LABEL: 'Spanner', + monitoring_infos.METHOD_LABEL: 'Write', + monitoring_infos.SPANNER_PROJECT_ID: self.project, + monitoring_infos.SPANNER_DATABASE_ID: self.TEST_DATABASE, + monitoring_infos.RESOURCE_LABEL: resource, + monitoring_infos.SPANNER_TABLE_ID: 'Albums', + monitoring_infos.STATUS_LABEL: 'ok' + } + + p = beam.Pipeline(argv=self.args) + _ = ( + p | beam.Create(mutations) | WriteToSpanner( + project_id=self.project, + instance_id=self.instance, + database_id=self.TEST_DATABASE)) + + res = p.run() + res.wait_until_finish() + + metric_name = MetricName( + None, None, urn=monitoring_infos.API_REQUEST_COUNT_URN, labels=labels) + metric_value = MetricsEnvironment.process_wide_container().get_counter( + metric_name).get_cumulative() + + self.assertEqual(metric_value, 1) + + @pytest.mark.it_postcommit + def test_spanner_metrics_error_call(self): + MetricsEnvironment.process_wide_container().reset() + _prefix = 'test_write_batches' + mutations = [ + WriteMutation.insert( + 'Albums', ('AlbumId', 'Name'), + [(_prefix + '3', _prefix + 'inset-3')]), + WriteMutation.insert( + 'Albums', ('AlbumId', 'Name'), + [(_prefix + '3', _prefix + 'inset-3')]), + ] + + resource = resource_identifiers.SpannerTable( + self.project, self.TEST_DATABASE, 'Albums') + labels = { + monitoring_infos.SERVICE_LABEL: 'Spanner', + monitoring_infos.METHOD_LABEL: 'Write', + monitoring_infos.SPANNER_PROJECT_ID: self.project, + monitoring_infos.SPANNER_DATABASE_ID: self.TEST_DATABASE, + monitoring_infos.RESOURCE_LABEL: resource, + monitoring_infos.SPANNER_TABLE_ID: 'Albums', + monitoring_infos.STATUS_LABEL: '400' + } + + with self.assertRaises(Exception): + p = beam.Pipeline(argv=self.args) + _ = ( + p | beam.Create(mutations) | WriteToSpanner( + project_id=self.project, + instance_id=self.instance, + database_id=self.TEST_DATABASE)) + + res = p.run() + res.wait_until_finish() + + metric_name = MetricName( + None, None, urn=monitoring_infos.API_REQUEST_COUNT_URN, labels=labels) + metric_value = MetricsEnvironment.process_wide_container().get_counter( + metric_name).get_cumulative() + + self.assertEqual(metric_value, 1) + @classmethod def tearDownClass(cls): # drop the testing database after the tests From faaef1443fe1b2881432f116d11b7e5334ed1b7d Mon Sep 17 00:00:00 2001 From: Miguel Hernandez Date: Fri, 10 Sep 2021 03:23:02 -0500 Subject: [PATCH 4/6] [BEAM-11986] SpannerIO metrics helper functions --- .../io/gcp/experimental/spannerio.py | 125 +++++++----- .../experimental/spannerio_read_it_test.py | 181 +++++++++++------- .../io/gcp/experimental/spannerio_test.py | 51 ----- .../experimental/spannerio_write_it_test.py | 52 ++--- 4 files changed, 211 insertions(+), 198 deletions(-) diff --git a/sdks/python/apache_beam/io/gcp/experimental/spannerio.py b/sdks/python/apache_beam/io/gcp/experimental/spannerio.py index 0969b2f6156f..dc312aaae2f9 100644 --- a/sdks/python/apache_beam/io/gcp/experimental/spannerio.py +++ b/sdks/python/apache_beam/io/gcp/experimental/spannerio.py @@ -313,7 +313,7 @@ def snapshot_options(self): @with_input_types(ReadOperation, typing.Dict[typing.Any, typing.Any]) @with_output_types(typing.List[typing.Any]) class _NaiveSpannerReadDoFn(DoFn): - def __init__(self, spanner_configuration, table_metric, query_metric): + def __init__(self, spanner_configuration): """ A naive version of Spanner read which uses the transaction API of the cloud spanner. @@ -328,8 +328,42 @@ def __init__(self, spanner_configuration, table_metric, query_metric): self._spanner_configuration = spanner_configuration self._snapshot = None self._session = None - self._table_metric = table_metric - self._query_metric = query_metric + self.base_labels = { + monitoring_infos.SERVICE_LABEL: 'Spanner', + monitoring_infos.METHOD_LABEL: 'Read', + monitoring_infos.SPANNER_PROJECT_ID: self._spanner_configuration. + project, + monitoring_infos.SPANNER_DATABASE_ID: self._spanner_configuration. + database, + } + + def _table_metric(self, table_id, status): + database_id = self._spanner_configuration.database + project_id = self._spanner_configuration.project + resource = resource_identifiers.SpannerTable( + project_id, database_id, table_id) + labels = { + **self.base_labels, + monitoring_infos.RESOURCE_LABEL: resource, + monitoring_infos.SPANNER_TABLE_ID: table_id + } + service_call_metric = ServiceCallMetric( + request_count_urn=monitoring_infos.API_REQUEST_COUNT_URN, + base_labels=labels) + service_call_metric.call(str(status)) + + def _query_metric(self, query_name, status): + project_id = self._spanner_configuration.project + resource = resource_identifiers.SpannerSqlQuery(project_id, query_name) + labels = { + **self.base_labels, + monitoring_infos.RESOURCE_LABEL: resource, + monitoring_infos.SPANNER_QUERY_NAME: query_name + } + service_call_metric = ServiceCallMetric( + request_count_urn=monitoring_infos.API_REQUEST_COUNT_URN, + base_labels=labels) + service_call_metric.call(str(status)) def _get_session(self): if self._session is None: @@ -547,10 +581,44 @@ class _ReadFromPartitionFn(DoFn): """ A DoFn to perform reads from the partition. """ - def __init__(self, spanner_configuration, table_metric, query_metric): + def __init__(self, spanner_configuration): self._spanner_configuration = spanner_configuration - self._table_metric = table_metric - self._query_metric = query_metric + self.base_labels = { + monitoring_infos.SERVICE_LABEL: 'Spanner', + monitoring_infos.METHOD_LABEL: 'Read', + monitoring_infos.SPANNER_PROJECT_ID: self._spanner_configuration. + project, + monitoring_infos.SPANNER_DATABASE_ID: self._spanner_configuration. + database, + } + + def _table_metric(self, table_id, status): + database_id = self._spanner_configuration.database + project_id = self._spanner_configuration.project + resource = resource_identifiers.SpannerTable( + project_id, database_id, table_id) + labels = { + **self.base_labels, + monitoring_infos.RESOURCE_LABEL: resource, + monitoring_infos.SPANNER_TABLE_ID: table_id + } + service_call_metric = ServiceCallMetric( + request_count_urn=monitoring_infos.API_REQUEST_COUNT_URN, + base_labels=labels) + service_call_metric.call(str(status)) + + def _query_metric(self, query_name, status): + project_id = self._spanner_configuration.project + resource = resource_identifiers.SpannerSqlQuery(project_id, query_name) + labels = { + **self.base_labels, + monitoring_infos.RESOURCE_LABEL: resource, + monitoring_infos.SPANNER_QUERY_NAME: query_name + } + service_call_metric = ServiceCallMetric( + request_count_urn=monitoring_infos.API_REQUEST_COUNT_URN, + base_labels=labels) + service_call_metric.call(str(status)) def setup(self): spanner_client = Client(self._spanner_configuration.project) @@ -585,7 +653,6 @@ def process(self, element): yield row metric_action(metric_id, 'ok') - except (ClientError, GoogleAPICallError) as e: metric_action(metric_id, e.code.value) raise @@ -666,12 +733,6 @@ def __init__(self, project_id, instance_id, database_id, pool=None, self._read_operations = read_operations self._transaction = transaction - self.base_labels = { - monitoring_infos.SERVICE_LABEL: 'Spanner', - monitoring_infos.METHOD_LABEL: 'Read', - monitoring_infos.SPANNER_PROJECT_ID: self._configuration.project, - monitoring_infos.SPANNER_DATABASE_ID: self._configuration.database, - } if self._read_operations is None: if table is not None: @@ -687,34 +748,6 @@ def __init__(self, project_id, instance_id, database_id, pool=None, sql=sql, params=params, param_types=param_types) ] - def _table_metric(self, table_id, status): - database_id = self._configuration.database - project_id = self._configuration.project - resource = resource_identifiers.SpannerTable( - project_id, database_id, table_id) - labels = { - **self.base_labels, - monitoring_infos.RESOURCE_LABEL: resource, - monitoring_infos.SPANNER_TABLE_ID: table_id - } - service_call_metric = ServiceCallMetric( - request_count_urn=monitoring_infos.API_REQUEST_COUNT_URN, - base_labels=labels) - service_call_metric.call(str(status)) - - def _query_metric(self, query_name, status): - project_id = self._configuration.project - resource = resource_identifiers.SpannerSqlQuery(project_id, query_name) - labels = { - **self.base_labels, - monitoring_infos.RESOURCE_LABEL: resource, - monitoring_infos.SPANNER_QUERY_NAME: query_name - } - service_call_metric = ServiceCallMetric( - request_count_urn=monitoring_infos.API_REQUEST_COUNT_URN, - base_labels=labels) - service_call_metric.call(str(status)) - def expand(self, pbegin): if self._read_operations is not None and isinstance(pbegin, PBegin): pcoll = pbegin.pipeline | Create(self._read_operations) @@ -738,10 +771,7 @@ def expand(self, pbegin): _CreateReadPartitions(spanner_configuration=self._configuration)) | 'Reshuffle' >> Reshuffle() | 'Read From Partitions' >> ParDo( - _ReadFromPartitionFn( - spanner_configuration=self._configuration, - table_metric=self._table_metric, - query_metric=self._query_metric))) + _ReadFromPartitionFn(spanner_configuration=self._configuration))) else: # reading as naive read, in which we don't make batches and execute the # queries as a single read. @@ -749,10 +779,7 @@ def expand(self, pbegin): pcoll | 'Reshuffle' >> Reshuffle().with_input_types(ReadOperation) | 'Perform Read' >> ParDo( - _NaiveSpannerReadDoFn( - spanner_configuration=self._configuration, - table_metric=self._table_metric, - query_metric=self._query_metric), + _NaiveSpannerReadDoFn(spanner_configuration=self._configuration), AsSingleton(self._transaction))) return p diff --git a/sdks/python/apache_beam/io/gcp/experimental/spannerio_read_it_test.py b/sdks/python/apache_beam/io/gcp/experimental/spannerio_read_it_test.py index 93d08b9635b3..712729875a9d 100644 --- a/sdks/python/apache_beam/io/gcp/experimental/spannerio_read_it_test.py +++ b/sdks/python/apache_beam/io/gcp/experimental/spannerio_read_it_test.py @@ -129,19 +129,8 @@ def test_read_via_sql(self): assert_that(r, equal_to(self._data)) @pytest.mark.it_postcommit - def test_spanner_table_metrics_ok_call(self): + def test_transaction_table_metrics_ok_call(self): MetricsEnvironment.process_wide_container().reset() - resource = resource_identifiers.SpannerTable( - self.project, self.TEST_DATABASE, 'Users') - labels = { - monitoring_infos.SERVICE_LABEL: 'Spanner', - monitoring_infos.METHOD_LABEL: 'Read', - monitoring_infos.SPANNER_PROJECT_ID: self.project, - monitoring_infos.SPANNER_DATABASE_ID: self.TEST_DATABASE, - monitoring_infos.RESOURCE_LABEL: resource, - monitoring_infos.SPANNER_TABLE_ID: 'Users', - monitoring_infos.STATUS_LABEL: 'ok' - } with beam.Pipeline(argv=self.args) as p: transaction = ( @@ -155,28 +144,13 @@ def test_spanner_table_metrics_ok_call(self): columns=["UserId", "Key"], transaction=transaction) - metric_name = MetricName( - None, None, urn=monitoring_infos.API_REQUEST_COUNT_URN, labels=labels) - metric_value = MetricsEnvironment.process_wide_container().get_counter( - metric_name).get_cumulative() - assert_that(r, equal_to(self._data)) - self.assertEqual(metric_value, 1) + self.verify_table_read_call_metric( + self.project, self.TEST_DATABASE, 'Users', 'ok', 1) @pytest.mark.it_postcommit - def test_spanner_table_metrics_error_call(self): + def test_transaction_table_metrics_error_call(self): MetricsEnvironment.process_wide_container().reset() - resource = resource_identifiers.SpannerTable( - self.project, self.TEST_DATABASE, 'INVALID_TABLE') - labels = { - monitoring_infos.SERVICE_LABEL: 'Spanner', - monitoring_infos.METHOD_LABEL: 'Read', - monitoring_infos.SPANNER_PROJECT_ID: self.project, - monitoring_infos.SPANNER_DATABASE_ID: self.TEST_DATABASE, - monitoring_infos.RESOURCE_LABEL: resource, - monitoring_infos.SPANNER_TABLE_ID: 'INVALID_TABLE', - monitoring_infos.STATUS_LABEL: '404' - } with self.assertRaises(Exception): p = beam.Pipeline(argv=self.args) @@ -194,26 +168,12 @@ def test_spanner_table_metrics_error_call(self): res = p.run() res.wait_until_finish() - metric_name = MetricName( - None, None, urn=monitoring_infos.API_REQUEST_COUNT_URN, labels=labels) - metric_value = MetricsEnvironment.process_wide_container().get_counter( - metric_name).get_cumulative() - - self.assertEqual(metric_value, 1) + self.verify_table_read_call_metric( + self.project, self.TEST_DATABASE, 'INVALID_TABLE', '404', 1) @pytest.mark.it_postcommit - def test_spanner_sql_metrics_ok_call(self): + def test_transaction_sql_metrics_ok_call(self): MetricsEnvironment.process_wide_container().reset() - resource = resource_identifiers.SpannerSqlQuery(self.project, 'query-1') - labels = { - monitoring_infos.SERVICE_LABEL: 'Spanner', - monitoring_infos.METHOD_LABEL: 'Read', - monitoring_infos.SPANNER_PROJECT_ID: self.project, - monitoring_infos.SPANNER_DATABASE_ID: self.TEST_DATABASE, - monitoring_infos.RESOURCE_LABEL: resource, - monitoring_infos.SPANNER_QUERY_NAME: 'query-1', - monitoring_infos.STATUS_LABEL: 'ok' - } with beam.Pipeline(argv=self.args) as p: transaction = ( @@ -227,27 +187,13 @@ def test_spanner_sql_metrics_ok_call(self): query_name='query-1', transaction=transaction) - metric_name = MetricName( - None, None, urn=monitoring_infos.API_REQUEST_COUNT_URN, labels=labels) - metric_value = MetricsEnvironment.process_wide_container().get_counter( - metric_name).get_cumulative() - assert_that(r, equal_to(self._data)) - self.assertEqual(metric_value, 1) + self.verify_sql_read_call_metric( + self.project, self.TEST_DATABASE, 'query-1', 'ok', 1) @pytest.mark.it_postcommit - def test_spanner_sql_metrics_error_call(self): + def test_transaction_sql_metrics_error_call(self): MetricsEnvironment.process_wide_container().reset() - resource = resource_identifiers.SpannerSqlQuery(self.project, 'query-2') - labels = { - monitoring_infos.SERVICE_LABEL: 'Spanner', - monitoring_infos.METHOD_LABEL: 'Read', - monitoring_infos.SPANNER_PROJECT_ID: self.project, - monitoring_infos.SPANNER_DATABASE_ID: self.TEST_DATABASE, - monitoring_infos.RESOURCE_LABEL: resource, - monitoring_infos.SPANNER_QUERY_NAME: 'query-2', - monitoring_infos.STATUS_LABEL: '400' - } with self.assertRaises(Exception): p = beam.Pipeline(argv=self.args) @@ -265,12 +211,117 @@ def test_spanner_sql_metrics_error_call(self): res = p.run() res.wait_until_finish() + self.verify_sql_read_call_metric( + self.project, self.TEST_DATABASE, 'query-2', '400', 1) + + @pytest.mark.it_postcommit + def test_table_metrics_ok_call(self): + MetricsEnvironment.process_wide_container().reset() + + with beam.Pipeline(argv=self.args) as p: + r = p | ReadFromSpanner( + self.project, + self.instance, + self.TEST_DATABASE, + table="Users", + columns=["UserId", "Key"]) + + assert_that(r, equal_to(self._data)) + self.verify_table_read_call_metric( + self.project, self.TEST_DATABASE, 'Users', 'ok', 1) + + @pytest.mark.it_postcommit + def test_table_metrics_error_call(self): + MetricsEnvironment.process_wide_container().reset() + + with self.assertRaises(Exception): + p = beam.Pipeline(argv=self.args) + _ = p | ReadFromSpanner( + self.project, + self.instance, + self.TEST_DATABASE, + table="INVALID_TABLE", + columns=["UserId", "Key"]) + + res = p.run() + res.wait_until_finish() + + self.verify_table_read_call_metric( + self.project, self.TEST_DATABASE, 'INVALID_TABLE', '404', 1) + + @pytest.mark.it_postcommit + def test_sql_metrics_ok_call(self): + MetricsEnvironment.process_wide_container().reset() + + with beam.Pipeline(argv=self.args) as p: + r = p | ReadFromSpanner( + self.project, + self.instance, + self.TEST_DATABASE, + sql="select * from Users", + query_name='query-1') + + assert_that(r, equal_to(self._data)) + self.verify_sql_read_call_metric( + self.project, self.TEST_DATABASE, 'query-1', 'ok', 1) + + @pytest.mark.it_postcommit + def test_sql_metrics_error_call(self): + MetricsEnvironment.process_wide_container().reset() + + with self.assertRaises(Exception): + p = beam.Pipeline(argv=self.args) + _ = p | ReadFromSpanner( + self.project, + self.instance, + self.TEST_DATABASE, + sql="select * from NonExistent", + query_name='query-2') + + res = p.run() + res.wait_until_finish() + + self.verify_sql_read_call_metric( + self.project, self.TEST_DATABASE, 'query-2', '400', 1) + + def verify_table_read_call_metric( + self, project, database, table, status, count): + resource = resource_identifiers.SpannerTable(project, database, table) + labels = { + monitoring_infos.SERVICE_LABEL: 'Spanner', + monitoring_infos.METHOD_LABEL: 'Read', + monitoring_infos.SPANNER_PROJECT_ID: project, + monitoring_infos.SPANNER_DATABASE_ID: database, + monitoring_infos.RESOURCE_LABEL: resource, + monitoring_infos.SPANNER_TABLE_ID: table, + monitoring_infos.STATUS_LABEL: status + } + metric_name = MetricName( + None, None, urn=monitoring_infos.API_REQUEST_COUNT_URN, labels=labels) + metric_value = MetricsEnvironment.process_wide_container().get_counter( + metric_name).get_cumulative() + + self.assertEqual(metric_value, count) + + def verify_sql_read_call_metric( + self, project, database, query_name, status, count): + resource = resource_identifiers.SpannerSqlQuery(project, query_name) + labels = { + monitoring_infos.SERVICE_LABEL: 'Spanner', + monitoring_infos.METHOD_LABEL: 'Read', + monitoring_infos.SPANNER_PROJECT_ID: project, + monitoring_infos.SPANNER_DATABASE_ID: database, + monitoring_infos.RESOURCE_LABEL: resource, + monitoring_infos.SPANNER_QUERY_NAME: query_name, + monitoring_infos.STATUS_LABEL: status + } + metric_name = MetricName( None, None, urn=monitoring_infos.API_REQUEST_COUNT_URN, labels=labels) metric_value = MetricsEnvironment.process_wide_container().get_counter( metric_name).get_cumulative() - self.assertEqual(metric_value, 1) + self.assertEqual(metric_value, count) @classmethod def tearDownClass(cls): diff --git a/sdks/python/apache_beam/io/gcp/experimental/spannerio_test.py b/sdks/python/apache_beam/io/gcp/experimental/spannerio_test.py index c132a6833ed6..fd6d47c20286 100644 --- a/sdks/python/apache_beam/io/gcp/experimental/spannerio_test.py +++ b/sdks/python/apache_beam/io/gcp/experimental/spannerio_test.py @@ -379,57 +379,6 @@ def test_display_data(self, *args): self.assertTrue("table" in dd_transaction) self.assertTrue("transaction" in dd_transaction) - def test_read_monitoring_info( - self, mock_batch_snapshot_class, mock_client_class): - database = _generate_database_name() - resource = resource_identifiers.SpannerTable( - TEST_PROJECT_ID, database, 'users') - labels = { - monitoring_infos.SERVICE_LABEL: 'Spanner', - monitoring_infos.METHOD_LABEL: 'Read', - monitoring_infos.RESOURCE_LABEL: resource, - monitoring_infos.SPANNER_PROJECT_ID: TEST_PROJECT_ID, - monitoring_infos.SPANNER_DATABASE_ID: database, - monitoring_infos.SPANNER_TABLE_ID: 'users', - monitoring_infos.STATUS_LABEL: 'ok' - } - mock_snapshot = mock.MagicMock() - mock_snapshot.generate_read_batches.return_value = [{ - 'read': { - 'table': 'users', - 'keyset': { - 'all': True - }, - 'columns': ['Key', 'Value'], - 'index': '' - }, - 'partition': 'test_partition' - } for _ in range(3)] - mock_snapshot.process_read_batch.side_effect = [ - FAKE_ROWS[0:2], FAKE_ROWS[2:4], FAKE_ROWS[4:] - ] - - _ = [ReadOperation.table("users", ["Key", "Value"])] - pipeline = TestPipeline() - - _ = ( - pipeline - | 'read' >> ReadFromSpanner( - TEST_PROJECT_ID, - TEST_INSTANCE_ID, - database, - table="users", - columns=["Key", "Value"])) - - pipeline.run() - - metric_name = MetricName( - None, None, urn=monitoring_infos.API_REQUEST_COUNT_URN, labels=labels) - metric_value = MetricsEnvironment.process_wide_container().get_counter( - metric_name).get_cumulative() - - self.assertEqual(metric_value, 1) - @unittest.skipIf(spanner is None, 'GCP dependencies are not installed.') @mock.patch('apache_beam.io.gcp.experimental.spannerio.Client') diff --git a/sdks/python/apache_beam/io/gcp/experimental/spannerio_write_it_test.py b/sdks/python/apache_beam/io/gcp/experimental/spannerio_write_it_test.py index ac6736a184be..89a71001fce0 100644 --- a/sdks/python/apache_beam/io/gcp/experimental/spannerio_write_it_test.py +++ b/sdks/python/apache_beam/io/gcp/experimental/spannerio_write_it_test.py @@ -191,7 +191,7 @@ def test_spanner_error(self): p.run() @pytest.mark.it_postcommit - def test_spanner_metrics_ok_call(self): + def test_metrics_ok_call(self): MetricsEnvironment.process_wide_container().reset() _prefix = 'test_write_batches' mutations = [ @@ -203,18 +203,6 @@ def test_spanner_metrics_ok_call(self): [(_prefix + '2', _prefix + 'inset-2')]), ] - resource = resource_identifiers.SpannerTable( - self.project, self.TEST_DATABASE, 'Albums') - labels = { - monitoring_infos.SERVICE_LABEL: 'Spanner', - monitoring_infos.METHOD_LABEL: 'Write', - monitoring_infos.SPANNER_PROJECT_ID: self.project, - monitoring_infos.SPANNER_DATABASE_ID: self.TEST_DATABASE, - monitoring_infos.RESOURCE_LABEL: resource, - monitoring_infos.SPANNER_TABLE_ID: 'Albums', - monitoring_infos.STATUS_LABEL: 'ok' - } - p = beam.Pipeline(argv=self.args) _ = ( p | beam.Create(mutations) | WriteToSpanner( @@ -225,15 +213,11 @@ def test_spanner_metrics_ok_call(self): res = p.run() res.wait_until_finish() - metric_name = MetricName( - None, None, urn=monitoring_infos.API_REQUEST_COUNT_URN, labels=labels) - metric_value = MetricsEnvironment.process_wide_container().get_counter( - metric_name).get_cumulative() - - self.assertEqual(metric_value, 1) + self.verify_write_call_metric( + self.project, self.TEST_DATABASE, 'Albums', 'ok', 1) @pytest.mark.it_postcommit - def test_spanner_metrics_error_call(self): + def test_metrics_error_call(self): MetricsEnvironment.process_wide_container().reset() _prefix = 'test_write_batches' mutations = [ @@ -245,18 +229,6 @@ def test_spanner_metrics_error_call(self): [(_prefix + '3', _prefix + 'inset-3')]), ] - resource = resource_identifiers.SpannerTable( - self.project, self.TEST_DATABASE, 'Albums') - labels = { - monitoring_infos.SERVICE_LABEL: 'Spanner', - monitoring_infos.METHOD_LABEL: 'Write', - monitoring_infos.SPANNER_PROJECT_ID: self.project, - monitoring_infos.SPANNER_DATABASE_ID: self.TEST_DATABASE, - monitoring_infos.RESOURCE_LABEL: resource, - monitoring_infos.SPANNER_TABLE_ID: 'Albums', - monitoring_infos.STATUS_LABEL: '400' - } - with self.assertRaises(Exception): p = beam.Pipeline(argv=self.args) _ = ( @@ -268,12 +240,26 @@ def test_spanner_metrics_error_call(self): res = p.run() res.wait_until_finish() + self.verify_write_call_metric( + self.project, self.TEST_DATABASE, 'Albums', '400', 1) + + def verify_write_call_metric(self, project, database, table, status, count): + resource = resource_identifiers.SpannerTable(project, database, table) + labels = { + monitoring_infos.SERVICE_LABEL: 'Spanner', + monitoring_infos.METHOD_LABEL: 'Write', + monitoring_infos.SPANNER_PROJECT_ID: project, + monitoring_infos.SPANNER_DATABASE_ID: database, + monitoring_infos.RESOURCE_LABEL: resource, + monitoring_infos.SPANNER_TABLE_ID: table, + monitoring_infos.STATUS_LABEL: status + } metric_name = MetricName( None, None, urn=monitoring_infos.API_REQUEST_COUNT_URN, labels=labels) metric_value = MetricsEnvironment.process_wide_container().get_counter( metric_name).get_cumulative() - self.assertEqual(metric_value, 1) + self.assertEqual(metric_value, count) @classmethod def tearDownClass(cls): From d5380be55046f31faa2e9406aa00cd2b6ead76e0 Mon Sep 17 00:00:00 2001 From: Miguel Hernandez Date: Mon, 27 Sep 2021 11:52:07 -0500 Subject: [PATCH 5/6] [BEAM-11986] ServiceCallMetric instantiated before API call --- .../io/gcp/experimental/spannerio.py | 51 ++++++++++--------- 1 file changed, 26 insertions(+), 25 deletions(-) diff --git a/sdks/python/apache_beam/io/gcp/experimental/spannerio.py b/sdks/python/apache_beam/io/gcp/experimental/spannerio.py index 6c0cc62a168b..0022fdbfe32f 100644 --- a/sdks/python/apache_beam/io/gcp/experimental/spannerio.py +++ b/sdks/python/apache_beam/io/gcp/experimental/spannerio.py @@ -333,10 +333,10 @@ def __init__(self, spanner_configuration): self.base_labels = { monitoring_infos.SERVICE_LABEL: 'Spanner', monitoring_infos.METHOD_LABEL: 'Read', - monitoring_infos.SPANNER_PROJECT_ID: self._spanner_configuration. - project, - monitoring_infos.SPANNER_DATABASE_ID: self._spanner_configuration. - database, + monitoring_infos.SPANNER_PROJECT_ID: ( + self._spanner_configuration.project), + monitoring_infos.SPANNER_DATABASE_ID: ( + self._spanner_configuration.database), } def _table_metric(self, table_id, status): @@ -588,13 +588,14 @@ def __init__(self, spanner_configuration): self.base_labels = { monitoring_infos.SERVICE_LABEL: 'Spanner', monitoring_infos.METHOD_LABEL: 'Read', - monitoring_infos.SPANNER_PROJECT_ID: self._spanner_configuration. - project, - monitoring_infos.SPANNER_DATABASE_ID: self._spanner_configuration. - database, + monitoring_infos.SPANNER_PROJECT_ID: ( + self._spanner_configuration.project), + monitoring_infos.SPANNER_DATABASE_ID: ( + self._spanner_configuration.database), } + self.service_metric = None - def _table_metric(self, table_id, status): + def _table_metric(self, table_id): database_id = self._spanner_configuration.database project_id = self._spanner_configuration.project resource = resource_identifiers.SpannerTable( @@ -607,9 +608,9 @@ def _table_metric(self, table_id, status): service_call_metric = ServiceCallMetric( request_count_urn=monitoring_infos.API_REQUEST_COUNT_URN, base_labels=labels) - service_call_metric.call(str(status)) + return service_call_metric - def _query_metric(self, query_name, status): + def _query_metric(self, query_name): project_id = self._spanner_configuration.project resource = resource_identifiers.SpannerSqlQuery(project_id, query_name) labels = { @@ -620,7 +621,7 @@ def _query_metric(self, query_name, status): service_call_metric = ServiceCallMetric( request_count_urn=monitoring_infos.API_REQUEST_COUNT_URN, base_labels=labels) - service_call_metric.call(str(status)) + return service_call_metric def setup(self): spanner_client = Client(self._spanner_configuration.project) @@ -640,12 +641,10 @@ def process(self, element): if element['is_sql'] is True: read_action = self._snapshot.process_query_batch - metric_action = self._query_metric - metric_id = query_name + self.service_metric = self._query_metric(query_name) elif element['is_table'] is True: read_action = self._snapshot.process_read_batch - metric_action = self._table_metric - metric_id = table_id + self.service_metric = self._table_metric(table_id) else: raise ValueError( "ReadOperation is improperly configure: %s" % str(element)) @@ -654,12 +653,12 @@ def process(self, element): for row in read_action(element['partitions']): yield row - metric_action(metric_id, 'ok') + self.service_metric.call('ok') except (ClientError, GoogleAPICallError) as e: - metric_action(metric_id, e.code.value) + self.service_metric(str(e.code.value)) raise except HttpError as e: - metric_action(metric_id, e) + self.service_metric(str(e)) raise def teardown(self): @@ -1193,8 +1192,9 @@ def __init__(self, spanner_configuration): monitoring_infos.SPANNER_PROJECT_ID: spanner_configuration.project, monitoring_infos.SPANNER_DATABASE_ID: spanner_configuration.database, } + self.service_metric = None - def _table_write_metric(self, table_id, status): + def _table_metric(self, table_id): database_id = self._spanner_configuration.database project_id = self._spanner_configuration.project resource = resource_identifiers.SpannerTable( @@ -1204,11 +1204,10 @@ def _table_write_metric(self, table_id, status): monitoring_infos.RESOURCE_LABEL: resource, monitoring_infos.SPANNER_TABLE_ID: table_id } - service_call_metric = ServiceCallMetric( request_count_urn=monitoring_infos.API_REQUEST_COUNT_URN, base_labels=labels) - service_call_metric.call(str(status)) + return service_call_metric def setup(self): spanner_client = Client(self._spanner_configuration.project) @@ -1223,6 +1222,8 @@ def process(self, element): with self._db_instance.batch() as b: for m in element: table_id = m.kwargs['table'] + self.service_metric = self._table_metric(table_id) + if m.operation == WriteMutation._OPERATION_DELETE: batch_func = b.delete elif m.operation == WriteMutation._OPERATION_REPLACE: @@ -1237,12 +1238,12 @@ def process(self, element): raise ValueError("Unknown operation action: %s" % m.operation) batch_func(**m.kwargs) - self._table_write_metric(table_id, 'ok') + self.service_metric.call('ok') except (ClientError, GoogleAPICallError) as e: - self._table_write_metric(table_id, e.code.value) + self.service_metric.call(str(e.code.value)) raise except HttpError as e: - self._table_write_metric(table_id, e) + self.service_metric.call(str(e)) raise From dd6624c9a1f6b05ef1a8017b806f5474c5b56c6f Mon Sep 17 00:00:00 2001 From: Miguel Hernandez Date: Wed, 27 Oct 2021 17:24:51 -0500 Subject: [PATCH 6/6] [BEAM-11986] Skipped metrics tests on Dataflow --- .../experimental/spannerio_read_it_test.py | 24 +++++++++++++++++++ .../experimental/spannerio_write_it_test.py | 6 +++++ 2 files changed, 30 insertions(+) diff --git a/sdks/python/apache_beam/io/gcp/experimental/spannerio_read_it_test.py b/sdks/python/apache_beam/io/gcp/experimental/spannerio_read_it_test.py index 712729875a9d..bce7a66612cb 100644 --- a/sdks/python/apache_beam/io/gcp/experimental/spannerio_read_it_test.py +++ b/sdks/python/apache_beam/io/gcp/experimental/spannerio_read_it_test.py @@ -130,6 +130,9 @@ def test_read_via_sql(self): @pytest.mark.it_postcommit def test_transaction_table_metrics_ok_call(self): + if 'DirectRunner' not in self.runner_name: + raise unittest.SkipTest('This test only runs with DirectRunner.') + MetricsEnvironment.process_wide_container().reset() with beam.Pipeline(argv=self.args) as p: @@ -150,6 +153,9 @@ def test_transaction_table_metrics_ok_call(self): @pytest.mark.it_postcommit def test_transaction_table_metrics_error_call(self): + if 'DirectRunner' not in self.runner_name: + raise unittest.SkipTest('This test only runs with DirectRunner.') + MetricsEnvironment.process_wide_container().reset() with self.assertRaises(Exception): @@ -173,6 +179,9 @@ def test_transaction_table_metrics_error_call(self): @pytest.mark.it_postcommit def test_transaction_sql_metrics_ok_call(self): + if 'DirectRunner' not in self.runner_name: + raise unittest.SkipTest('This test only runs with DirectRunner.') + MetricsEnvironment.process_wide_container().reset() with beam.Pipeline(argv=self.args) as p: @@ -193,6 +202,9 @@ def test_transaction_sql_metrics_ok_call(self): @pytest.mark.it_postcommit def test_transaction_sql_metrics_error_call(self): + if 'DirectRunner' not in self.runner_name: + raise unittest.SkipTest('This test only runs with DirectRunner.') + MetricsEnvironment.process_wide_container().reset() with self.assertRaises(Exception): @@ -216,6 +228,9 @@ def test_transaction_sql_metrics_error_call(self): @pytest.mark.it_postcommit def test_table_metrics_ok_call(self): + if 'DirectRunner' not in self.runner_name: + raise unittest.SkipTest('This test only runs with DirectRunner.') + MetricsEnvironment.process_wide_container().reset() with beam.Pipeline(argv=self.args) as p: @@ -232,6 +247,9 @@ def test_table_metrics_ok_call(self): @pytest.mark.it_postcommit def test_table_metrics_error_call(self): + if 'DirectRunner' not in self.runner_name: + raise unittest.SkipTest('This test only runs with DirectRunner.') + MetricsEnvironment.process_wide_container().reset() with self.assertRaises(Exception): @@ -251,6 +269,9 @@ def test_table_metrics_error_call(self): @pytest.mark.it_postcommit def test_sql_metrics_ok_call(self): + if 'DirectRunner' not in self.runner_name: + raise unittest.SkipTest('This test only runs with DirectRunner.') + MetricsEnvironment.process_wide_container().reset() with beam.Pipeline(argv=self.args) as p: @@ -267,6 +288,9 @@ def test_sql_metrics_ok_call(self): @pytest.mark.it_postcommit def test_sql_metrics_error_call(self): + if 'DirectRunner' not in self.runner_name: + raise unittest.SkipTest('This test only runs with DirectRunner.') + MetricsEnvironment.process_wide_container().reset() with self.assertRaises(Exception): diff --git a/sdks/python/apache_beam/io/gcp/experimental/spannerio_write_it_test.py b/sdks/python/apache_beam/io/gcp/experimental/spannerio_write_it_test.py index 89a71001fce0..e2f46d26644b 100644 --- a/sdks/python/apache_beam/io/gcp/experimental/spannerio_write_it_test.py +++ b/sdks/python/apache_beam/io/gcp/experimental/spannerio_write_it_test.py @@ -192,6 +192,9 @@ def test_spanner_error(self): @pytest.mark.it_postcommit def test_metrics_ok_call(self): + if 'DirectRunner' not in self.runner_name: + raise unittest.SkipTest('This test only runs with DirectRunner.') + MetricsEnvironment.process_wide_container().reset() _prefix = 'test_write_batches' mutations = [ @@ -218,6 +221,9 @@ def test_metrics_ok_call(self): @pytest.mark.it_postcommit def test_metrics_error_call(self): + if 'DirectRunner' not in self.runner_name: + raise unittest.SkipTest('This test only runs with DirectRunner.') + MetricsEnvironment.process_wide_container().reset() _prefix = 'test_write_batches' mutations = [