Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
51 changes: 35 additions & 16 deletions sdks/python/apache_beam/runners/dataflow/internal/apiclient.py
Original file line number Diff line number Diff line change
Expand Up @@ -710,10 +710,6 @@ def translate_value(value, metric_update_proto):
metric_update_proto.integer = to_split_int(value)


def translate_scalar(accumulator, metric_update):
metric_update.scalar = to_json_value(accumulator.value, with_type=True)


def translate_mean(accumulator, metric_update):
if accumulator.count:
metric_update.meanSum = to_json_value(accumulator.sum, with_type=True)
Expand All @@ -733,20 +729,43 @@ def _use_fnapi(pipeline_options):


# To enable a counter on the service, add it to this dictionary.
metric_translations = {
cy_combiners.CountCombineFn: ('sum', translate_scalar),
cy_combiners.SumInt64Fn: ('sum', translate_scalar),
cy_combiners.MinInt64Fn: ('min', translate_scalar),
cy_combiners.MaxInt64Fn: ('max', translate_scalar),
cy_combiners.MeanInt64Fn: ('mean', translate_mean),
cy_combiners.SumFloatFn: ('sum', translate_scalar),
cy_combiners.MinFloatFn: ('min', translate_scalar),
cy_combiners.MaxFloatFn: ('max', translate_scalar),
cy_combiners.MeanFloatFn: ('mean', translate_mean),
cy_combiners.AllCombineFn: ('and', translate_scalar),
cy_combiners.AnyCombineFn: ('or', translate_scalar),
structured_counter_translations = {
cy_combiners.CountCombineFn: (
dataflow.CounterMetadata.KindValueValuesEnum.SUM,
MetricUpdateTranslators.translate_scalar_counter_int),
cy_combiners.SumInt64Fn: (
dataflow.CounterMetadata.KindValueValuesEnum.SUM,
MetricUpdateTranslators.translate_scalar_counter_int),
cy_combiners.MinInt64Fn: (
dataflow.CounterMetadata.KindValueValuesEnum.MIN,
MetricUpdateTranslators.translate_scalar_counter_int),
cy_combiners.MaxInt64Fn: (
dataflow.CounterMetadata.KindValueValuesEnum.MAX,
MetricUpdateTranslators.translate_scalar_counter_int),
cy_combiners.MeanInt64Fn: (
dataflow.CounterMetadata.KindValueValuesEnum.MEAN,
MetricUpdateTranslators.translate_scalar_mean_int),
cy_combiners.SumFloatFn: (
dataflow.CounterMetadata.KindValueValuesEnum.SUM,
MetricUpdateTranslators.translate_scalar_counter_float),
cy_combiners.MinFloatFn: (
dataflow.CounterMetadata.KindValueValuesEnum.MIN,
MetricUpdateTranslators.translate_scalar_counter_float),
cy_combiners.MaxFloatFn: (
dataflow.CounterMetadata.KindValueValuesEnum.MAX,
MetricUpdateTranslators.translate_scalar_counter_float),
cy_combiners.MeanFloatFn: (
dataflow.CounterMetadata.KindValueValuesEnum.MEAN,
MetricUpdateTranslators.translate_scalar_mean_float),
cy_combiners.AllCombineFn: (
dataflow.CounterMetadata.KindValueValuesEnum.AND,
MetricUpdateTranslators.translate_boolean),
cy_combiners.AnyCombineFn: (
dataflow.CounterMetadata.KindValueValuesEnum.OR,
MetricUpdateTranslators.translate_boolean),
}


counter_translations = {
cy_combiners.CountCombineFn: (
dataflow.NameAndKind.KindValueValuesEnum.SUM,
Expand Down
92 changes: 42 additions & 50 deletions sdks/python/apache_beam/utils/counters.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,46 @@
from apache_beam.transforms import cy_combiners


class CounterName(object):
"""Naming information for a counter."""
SYSTEM = object()
USER = object()

def __init__(self, name, stage_name=None, step_name=None,
system_name=None, namespace=None,
origin=None, output_index=None):
self.name = name
self.origin = origin or CounterName.SYSTEM
self.namespace = namespace
self.stage_name = stage_name
self.step_name = step_name
self.system_name = system_name
self.output_index = output_index

def __hash__(self):
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you need the hash function? What is the default one?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The default hash function is related to the id() of the object. This means that without overriding it, this would be True:
hash(CounterName('arg1', 'arg2', 'arg3', 'arg4')) != hash(CounterName('arg1', 'arg2', 'arg3', 'arg4'))

I need counters to be indexable by their name. e.g.

my_d = {CounterName('arg1', 'arg2', 'arg3', 'arg4'): my_counter}
assert my_d[CounterName('arg1', 'arg2', 'arg3', 'arg4')] == my_counter

That's why I need to override the hash function. In fact, I'll add unit tests for this.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks

return hash((self.name,
self.origin,
self.namespace,
self.stage_name,
self.step_name,
self.system_name,
self.output_index))

def __str__(self):
return '%s' % self._str_internal()

def __repr__(self):
return '<%s at %s>' % (self._str_internal(), hex(id(self)))

def _str_internal(self):
if self.origin == CounterName.USER:
return 'user-%s-%s' % (self.step_name, self.name)
elif self.origin == CounterName.SYSTEM and self.output_index:
return '%s-out%s-%s' % (self.step_name, self.output_index, self.name)
else:
return '%s-%s-%s' % (self.stage_name, self.step_name, self.name)


class Counter(object):
"""A counter aggregates a series of values.

Expand All @@ -52,8 +92,8 @@ def __init__(self, name, combine_fn):
"""Creates a Counter object.

Args:
name: the name of this counter. Typically has three parts:
"step-output-counter".
name: the name of this counter. It may be a string,
or a CounterName object.
combine_fn: the CombineFn to use for aggregation
"""
self.name = name
Expand Down Expand Up @@ -90,10 +130,6 @@ def update(self, value):
self._fast_add_input(value)


# Counters that represent Accumulators have names starting with this
USER_COUNTER_PREFIX = 'user-'


class CounterFactory(object):
"""Keeps track of unique counters."""

Expand Down Expand Up @@ -128,21 +164,6 @@ def get_counter(self, name, combine_fn):
self.counters[name] = counter
return counter

def get_aggregator_counter(self, step_name, aggregator):
"""Returns an AggregationCounter for this step's aggregator.

Passing in the same values will return the same counter.

Args:
step_name: the name of this step.
aggregator: an Aggregator object.
Returns:
A new or existing counter.
"""
return self.get_counter(
'%s%s-%s' % (USER_COUNTER_PREFIX, step_name, aggregator.name),
aggregator.combine_fn)

def get_counters(self):
"""Returns the current set of counters.

Expand All @@ -154,32 +175,3 @@ def get_counters(self):
"""
with self._lock:
return self.counters.values()

def get_aggregator_values(self, aggregator_or_name):
"""Returns dict of step names to values of the aggregator."""
with self._lock:
return get_aggregator_values(
aggregator_or_name, self.counters, lambda counter: counter.value())


def get_aggregator_values(aggregator_or_name, counter_dict,
value_extractor=None):
"""Extracts the named aggregator value from a set of counters.

Args:
aggregator_or_name: an Aggregator object or the name of one.
counter_dict: a dict object of {name: value_wrapper}
value_extractor: a function to convert the value_wrapper into a value.
If None, no extraction is done and the value is return unchanged.

Returns:
dict of step names to values of the aggregator.
"""
name = aggregator_or_name
if value_extractor is None:
value_extractor = lambda x: x
if not isinstance(aggregator_or_name, basestring):
name = aggregator_or_name.name
return {n: value_extractor(c) for n, c in counter_dict.iteritems()
if n.startswith(USER_COUNTER_PREFIX)
and n.endswith('-%s' % name)}