Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 16 additions & 11 deletions sdks/python/apache_beam/metrics/execution.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,36 +48,38 @@
class MetricKey(object):
"""Key used to identify instance of metric cell.

Metrics are internally keyed by the name of the step they're associated with
and the name of the metric.
Metrics are internally keyed by the name of the step they're associated with,
the name and namespace (if it is a user defined metric) of the the metric,
and any extra label metadata added by the runner specific metric collection
service.
"""
def __init__(self, step, metric):
def __init__(self, step, metric, labels=None):
"""Initializes ``MetricKey``.

Args:
step: A string with the step this metric cell is part of.
metric: A ``MetricName`` that identifies a metric.
metric: A ``MetricName`` namespace+name that identifies a metric.
labels: An arbitrary set of labels that also identifies the metric.
"""
self.step = step
self.metric = metric
self.labels = labels if labels else dict()

def __eq__(self, other):
return (self.step == other.step and
self.metric == other.metric)
self.metric == other.metric and
self.labels == other.labels)

def __ne__(self, other):
# TODO(BEAM-5949): Needed for Python 2 compatibility.
return not self == other

def __hash__(self):
return hash((self.step, self.metric))
return hash((self.step, self.metric, frozenset(self.labels)))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@ajamato @Ardagan - We did not resolve this issue: It seems to me that labels are supposed to be a Python set, but a dict is being used here (see line 66).
If labels are a dictionary, then you need to do frozenset(self.labels.items()) to include the values in the hash and not just the keys.
It seems that labels should be a set. In that case, line 66 should be self.labels = labels if labels else set()


def __repr__(self):
return 'MetricKey(step={}, metric={})'.format(
self.step, self.metric)

def __hash__(self):
return hash((self.step, self.metric))
return 'MetricKey(step={}, metric={}, labels={})'.format(
self.step, self.metric, self.labels)


class MetricResult(object):
Expand Down Expand Up @@ -122,6 +124,9 @@ def __repr__(self):
return 'MetricResult(key={}, committed={}, attempted={})'.format(
self.key, str(self.committed), str(self.attempted))

def __str__(self):
return repr(self)

@property
def result(self):
"""Short-hand for falling back to attempted metrics if it seems that
Expand Down
48 changes: 48 additions & 0 deletions sdks/python/apache_beam/metrics/execution_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,58 @@
from builtins import range

from apache_beam.metrics.cells import CellCommitState
from apache_beam.metrics.execution import MetricKey
from apache_beam.metrics.execution import MetricsContainer
from apache_beam.metrics.metricbase import MetricName


class TestMetricKey(unittest.TestCase):
def test_equality_for_key_with_labels(self):
test_labels = {'label1', 'value1'}
test_object = MetricKey(
'step', MetricName('namespace', 'name'), labels=test_labels)
same_labels = MetricKey(
'step', MetricName('namespace', 'name'), labels={'label1', 'value1'})
same_label_reference = MetricKey(
'step', MetricName('namespace', 'name'), labels=test_labels)
self.assertEqual(test_object, same_labels)
self.assertEqual(test_object, same_label_reference)
self.assertEqual(hash(test_object), hash(same_labels))
self.assertEqual(hash(test_object), hash(same_label_reference))

def test_inequality_for_key_with_labels(self):
test_labels = {'label1', 'value1'}
test_object = MetricKey(
'step', MetricName('namespace', 'name'), labels=test_labels)
no_labels = MetricKey('step', MetricName('namespace', 'name'))
diff_label_key = MetricKey(
'step', MetricName('namespace', 'name'), labels={'l1_diff', 'value1'})
diff_label_value = MetricKey(
'step', MetricName('namespace', 'name'), labels={'label1', 'v1_diff'})
self.assertNotEqual(test_object, no_labels)
self.assertNotEqual(test_object, diff_label_key)
self.assertNotEqual(test_object, diff_label_value)
self.assertNotEqual(hash(test_object), hash(no_labels))
self.assertNotEqual(hash(test_object), hash(diff_label_key))
self.assertNotEqual(hash(test_object), hash(diff_label_value))

def test_equality_for_key_with_no_labels(self):
test_object = MetricKey('step', MetricName('namespace', 'name'))
same = MetricKey('step', MetricName('namespace', 'name'))
self.assertEqual(test_object, same)
self.assertEqual(hash(test_object), hash(same))

diff_step = MetricKey('step_diff', MetricName('namespace', 'name'))
diff_namespace = MetricKey('step', MetricName('namespace_diff', 'name'))
diff_name = MetricKey('step', MetricName('namespace', 'name_diff'))
self.assertNotEqual(test_object, diff_step)
self.assertNotEqual(test_object, diff_namespace)
self.assertNotEqual(test_object, diff_name)
self.assertNotEqual(hash(test_object), hash(diff_step))
self.assertNotEqual(hash(test_object), hash(diff_namespace))
self.assertNotEqual(hash(test_object), hash(diff_name))


class TestMetricsContainer(unittest.TestCase):
def test_create_new_counter(self):
mc = MetricsContainer('astep')
Expand Down
61 changes: 45 additions & 16 deletions sdks/python/apache_beam/runners/dataflow/dataflow_metrics.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,11 @@ def _get_match(proto, filter_fn):
return query[0]


# V1b3 MetricStructuredName keys to accept and copy to the MetricKey labels.
STRUCTURED_NAME_LABELS = [
'execution_step', 'original_name', 'output_user_name', 'step']


class DataflowMetrics(MetricResults):
"""Implementation of MetricResults class for the Dataflow runner."""

Expand Down Expand Up @@ -97,7 +102,9 @@ def _translate_step_name(self, internal_name):

def _get_metric_key(self, metric):
"""Populate the MetricKey object for a queried metric result."""
try:
step = ""
name = metric.name.name # Always extract a name
try: # Try to extract the user step name.
# If ValueError is thrown within this try-block, it is because of
# one of the following:
# 1. Unable to translate the step name. Only happening with improperly
Expand All @@ -108,23 +115,39 @@ def _get_metric_key(self, metric):
step = _get_match(metric.name.context.additionalProperties,
lambda x: x.key == 'step').value
step = self._translate_step_name(step)
except ValueError:
pass

namespace = "dataflow/v1b3" # Try to extract namespace or add a default.
try:
namespace = _get_match(metric.name.context.additionalProperties,
lambda x: x.key == 'namespace').value
name = metric.name.name
except ValueError:
return None

return MetricKey(step, MetricName(namespace, name))

def _populate_metric_results(self, response):
"""Take a list of metrics, and convert it to a list of MetricResult."""
user_metrics = [metric
for metric in response.metrics
if metric.name.origin == 'user']
pass

labels = dict()
for kv in metric.name.context.additionalProperties:
if kv.key in STRUCTURED_NAME_LABELS:
labels[kv.key] = kv.value
# Package everything besides namespace and name the labels as well,
# including unmodified step names to assist in integration the exact
# unmodified values which come from dataflow.
return MetricKey(step, MetricName(namespace, name), labels=labels)

def _populate_metrics(self, response, result, user_metrics=False):
"""Move metrics from response to results as MetricResults."""
if user_metrics:
metrics = [metric
for metric in response.metrics
if metric.name.origin == 'user']
else:
metrics = [metric
for metric in response.metrics
if metric.name.origin == 'dataflow/v1b3']

# Get the tentative/committed versions of every metric together.
metrics_by_name = defaultdict(lambda: {})
for metric in user_metrics:
for metric in metrics:
if (metric.name.name.endswith('[MIN]') or
metric.name.name.endswith('[MAX]') or
metric.name.name.endswith('[MEAN]') or
Expand All @@ -148,7 +171,6 @@ def _populate_metric_results(self, response):
metrics_by_name[metric_key][tentative_or_committed] = metric

# Now we create the MetricResult elements.
result = []
for metric_key, metric in iteritems(metrics_by_name):
attempted = self._get_metric_value(metric['tentative'])
committed = self._get_metric_value(metric['committed'])
Expand All @@ -158,8 +180,6 @@ def _populate_metric_results(self, response):
attempted=attempted,
committed=committed))

return result

def _get_metric_value(self, metric):
"""Get a metric result object from a MetricUpdate from Dataflow API."""
if metric is None:
Expand Down Expand Up @@ -200,9 +220,18 @@ def _get_metrics_from_dataflow(self):
self._cached_metrics = job_metrics
return job_metrics

def all_metrics(self):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we expand query() and __populate_metrics() with "query_user" and "query_v1b3" (based on code, can be query_system) parameters. This will keep only 1 method that will be more flexible than two of currently existing.

Default values should keep code backwards compatible.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, we need to discuss that kind of change on the dev list. This PR introduces he bare minimum to support integration testing. To enhance this API we need to design on the dev list.

"""Return all user and system metrics from the dataflow service."""
metric_results = []
response = self._get_metrics_from_dataflow()
self._populate_metrics(response, metric_results, user_metrics=True)
self._populate_metrics(response, metric_results, user_metrics=False)
return metric_results

def query(self, filter=None):
metric_results = []
response = self._get_metrics_from_dataflow()
metric_results = self._populate_metric_results(response)
self._populate_metrics(response, metric_results, user_metrics=True)
return {self.COUNTERS: [elm for elm in metric_results
if self.matches(filter, elm.key)
and DataflowMetrics._is_counter(elm)],
Expand Down
Loading