Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
90 changes: 90 additions & 0 deletions sdks/python/apache_beam/internal/metrics/metric.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
from typing import Union

from apache_beam.internal.metrics.cells import HistogramCellFactory
from apache_beam.metrics import monitoring_infos
from apache_beam.metrics.execution import MetricUpdater
from apache_beam.metrics.metric import Metrics as UserMetrics
from apache_beam.metrics.metricbase import Histogram
Expand All @@ -49,12 +50,40 @@
from apache_beam.metrics.cells import MetricCellFactory
from apache_beam.utils.histogram import BucketType

# Protect against environments where bigquery library is not available.
# pylint: disable=wrong-import-order, wrong-import-position
try:
from apitools.base.py.exceptions import HttpError
except ImportError:
pass

__all__ = ['Metrics']

_LOGGER = logging.getLogger(__name__)


class Metrics(object):
@staticmethod
def counter(urn, labels=None, process_wide=False):
# type: (str, Optional[Dict[str, str]], bool) -> UserMetrics.DelegatingCounter

"""Obtains or creates a Counter metric.

Args:
namespace: A class or string that gives the namespace to a metric
name: A string that gives a unique name to a metric
urn: URN to populate on a MonitoringInfo, when sending to RunnerHarness.
labels: Labels to populate on a MonitoringInfo
process_wide: Whether or not the metric is specific to the current bundle
or should be calculated for the entire process.

Returns:
A Counter object.
"""
return UserMetrics.DelegatingCounter(
MetricName(namespace=None, name=None, urn=urn, labels=labels),
process_wide=process_wide)

@staticmethod
def histogram(namespace, name, bucket_type, logger=None):
# type: (Union[Type, str], str, BucketType, Optional[MetricLogger]) -> Metrics.DelegatingHistogram
Expand Down Expand Up @@ -136,3 +165,64 @@ def log_metrics(self, reset_after_logging=False):
self._last_logging_millis = current_millis
finally:
self._lock.release()


class ServiceCallMetric(object):
"""Metric class which records Service API call metrics.

This class will capture a request count metric for the specified
request_count_urn and base_labels.

When call() is invoked the status must be provided, which will
be converted to a canonical GCP status code, if possible.

TODO(ajamato): Add Request latency metric.
"""
def __init__(self, request_count_urn, base_labels=None):
# type: (str, Optional[Dict[str, str]]) -> None
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fyi - from now on, you can use type hints directly, not as comments (no need to change anything atm)

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ack

self.base_labels = base_labels if base_labels else {}
self.request_count_urn = request_count_urn

def call(self, status):
# type: (Union[int, str, HttpError]) -> None

"""Record the status of the call into appropriate metrics."""
canonical_status = self.convert_to_canonical_status_string(status)
additional_labels = {monitoring_infos.STATUS_LABEL: canonical_status}

labels = dict(
list(self.base_labels.items()) + list(additional_labels.items()))

request_counter = Metrics.counter(
urn=self.request_count_urn, labels=labels, process_wide=True)
request_counter.inc()

def convert_to_canonical_status_string(self, status):
# type: (Union[int, str, HttpError]) -> str

"""Converts a status to a canonical GCP status cdoe string."""
http_status_code = None
if isinstance(status, int):
http_status_code = status
elif isinstance(status, str):
return status.lower()
elif isinstance(status, HttpError):
http_status_code = int(status.status_code)
http_to_canonical_gcp_status = {
200: 'ok',
400: 'out_of_range',
401: 'unauthenticated',
403: 'permission_denied',
404: 'not_found',
409: 'already_exists',
429: 'resource_exhausted',
499: 'cancelled',
500: 'internal',
501: 'not_implemented',
503: 'unavailable',
504: 'deadline_exceeded'
}
if (http_status_code is not None and
http_status_code in http_to_canonical_gcp_status):
return http_to_canonical_gcp_status[http_status_code]
return str(http_status_code)
41 changes: 41 additions & 0 deletions sdks/python/apache_beam/internal/metrics/metric_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,14 @@
from mock import patch

from apache_beam.internal.metrics.cells import HistogramCellFactory
from apache_beam.internal.metrics.metric import Metrics as InternalMetrics
from apache_beam.internal.metrics.metric import MetricLogger
from apache_beam.metrics.execution import MetricsContainer
from apache_beam.metrics.execution import MetricsEnvironment
from apache_beam.metrics.metric import Metrics
from apache_beam.metrics.metricbase import MetricName
from apache_beam.runners.worker import statesampler
from apache_beam.utils import counters
from apache_beam.utils.histogram import LinearBucket


Expand All @@ -48,5 +53,41 @@ def __eq__(self, other):
Contains('HistogramData(Total count: 1, P99: 2, P90: 2, P50: 2)'))


class MetricsTest(unittest.TestCase):
def test_create_process_wide(self):
sampler = statesampler.StateSampler('', counters.CounterFactory())
statesampler.set_current_tracker(sampler)
state1 = sampler.scoped_state(
'mystep', 'myState', metrics_container=MetricsContainer('mystep'))

try:
sampler.start()
with state1:
urn = "my:custom:urn"
labels = {'key': 'value'}
counter = InternalMetrics.counter(
urn=urn, labels=labels, process_wide=True)
# Test that if process_wide is set, that it will be set
# on the process_wide container.
counter.inc(10)
self.assertTrue(isinstance(counter, Metrics.DelegatingCounter))

del counter

metric_name = MetricName(None, None, urn=urn, labels=labels)
# Expect a value set on the current container.
self.assertEqual(
MetricsEnvironment.process_wide_container().get_counter(
metric_name).get_cumulative(),
10)
# Expect no value set on the current container.
self.assertEqual(
MetricsEnvironment.current_container().get_counter(
metric_name).get_cumulative(),
0)
finally:
sampler.stop()


if __name__ == '__main__':
unittest.main()
36 changes: 34 additions & 2 deletions sdks/python/apache_beam/io/gcp/bigquery_tools.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,9 +52,12 @@
from apache_beam.internal.http_client import get_new_http
from apache_beam.internal.metrics.metric import MetricLogger
from apache_beam.internal.metrics.metric import Metrics
from apache_beam.internal.metrics.metric import ServiceCallMetric
from apache_beam.io.gcp import bigquery_avro_tools
from apache_beam.io.gcp import resource_identifiers
from apache_beam.io.gcp.bigquery_io_metadata import create_bigquery_io_metadata
from apache_beam.io.gcp.internal.clients import bigquery
from apache_beam.metrics import monitoring_infos
from apache_beam.options import value_provider
from apache_beam.options.pipeline_options import GoogleCloudOptions
from apache_beam.runners.dataflow.native_io import iobase as dataflow_io
Expand Down Expand Up @@ -566,14 +569,43 @@ def _insert_all_rows(
skipInvalidRows=skip_invalid_rows,
# TODO(silviuc): Should have an option for ignoreUnknownValues?
rows=rows))

resource = resource_identifiers.BigQueryTable(
project_id, dataset_id, table_id)

labels = {
# TODO(ajamato): Add Ptransform label.
monitoring_infos.SERVICE_LABEL: 'BigQuery',
# Refer to any method which writes elements to BigQuery in batches
# as "BigQueryBatchWrite". I.e. storage API's insertAll, or future
# APIs introduced.
monitoring_infos.METHOD_LABEL: 'BigQueryBatchWrite',
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The method here would be a BatchWrite? Or a Streaming Insert maybe?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added a clarifying comment. I don't want this to be that specific. I.e. if new APIs are introduced, which do the same thing from a dataflow pipeline's perspective "Write batches of elements to BigQuery"

monitoring_infos.RESOURCE_LABEL: resource,
monitoring_infos.BIGQUERY_PROJECT_ID_LABEL: project_id,
monitoring_infos.BIGQUERY_DATASET_LABEL: dataset_id,
monitoring_infos.BIGQUERY_TABLE_LABEL: table_id,
}
service_call_metric = ServiceCallMetric(
request_count_urn=monitoring_infos.API_REQUEST_COUNT_URN,
base_labels=labels)

started_millis = int(time.time() * 1000)
response = None
try:
response = self.client.tabledata.InsertAll(request)
# response.insertErrors is not [] if errors encountered.
if not response.insertErrors:
service_call_metric.call('ok')
for insert_error in response.insertErrors:
for error in insert_error.errors:
service_call_metric.call(error.reason)
except HttpError as e:
service_call_metric.call(e)
finally:
self._latency_histogram_metric.update(
int(time.time() * 1000) - started_millis)
return not response.insertErrors, response.insertErrors
if response:
return not response.insertErrors, response.insertErrors
return False, []

@retry.with_exponential_backoff(
num_retries=MAX_RETRIES,
Expand Down
35 changes: 35 additions & 0 deletions sdks/python/apache_beam/io/gcp/resource_identifiers.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

"""Helper functions to generate resource labels strings for GCP entitites

These can be used on MonitoringInfo 'resource' labels.

See example entities:
https://s.apache.org/beam-gcp-debuggability

For GCP entities, populate the RESOURCE label with the aip.dev/122 format:
https://google.aip.dev/122

If an official GCP format does not exist, try to use the following format.
//whatever.googleapis.com/parents/{parentId}/whatevers/{whateverId}
"""


def BigQueryTable(project_id, dataset_id, table_id):
return '//bigquery.googleapis.com/projects/%s/datasets/%s/tables/%s' % (
project_id, dataset_id, table_id)
22 changes: 16 additions & 6 deletions sdks/python/apache_beam/metrics/cells.py
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,11 @@ def dec(self, n=1):
def update(self, value):
if cython.compiled:
ivalue = value
# We hold the GIL, no need for another lock.
# Since We hold the GIL, no need for another lock.
# And because the C threads won't preempt and interleave
# each other.
# Assuming there is no code trying to access the counters
# directly by circumventing the GIL.
self.value += ivalue
else:
with self._lock:
Expand All @@ -134,11 +138,17 @@ def get_cumulative(self):

def to_runner_api_monitoring_info(self, name, transform_id):
from apache_beam.metrics import monitoring_infos
return monitoring_infos.int64_user_counter(
name.namespace,
name.name,
self.get_cumulative(),
ptransform=transform_id)
if not name.urn:
# User counter case.
return monitoring_infos.int64_user_counter(
name.namespace,
name.name,
self.get_cumulative(),
ptransform=transform_id)
else:
# Arbitrary URN case.
return monitoring_infos.int64_counter(
name.urn, self.get_cumulative(), labels=name.labels)


class DistributionCell(MetricCell):
Expand Down
4 changes: 2 additions & 2 deletions sdks/python/apache_beam/metrics/execution.pxd
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ cimport libc.stdint

from apache_beam.metrics.cells cimport MetricCell


cdef object get_current_tracker


Expand All @@ -36,7 +35,8 @@ cdef object _DEFAULT

cdef class MetricUpdater(object):
cdef _TypedMetricName typed_metric_name
cdef object default
cdef object default_value
cdef bint process_wide # bint is used to represent C++ bool.


cdef class MetricsContainer(object):
Expand Down
Loading