From 7beb2abf8b9ed33882961ef9e993a3f53a9363df Mon Sep 17 00:00:00 2001 From: Pablo Date: Tue, 22 Aug 2017 16:53:32 -0700 Subject: [PATCH 1/4] Adding IOTargetName and unittests for CounterName --- sdks/python/apache_beam/utils/counters.py | 63 +++++++++++-- .../python/apache_beam/utils/counters_test.py | 88 +++++++++++++++++++ 2 files changed, 142 insertions(+), 9 deletions(-) create mode 100644 sdks/python/apache_beam/utils/counters_test.py diff --git a/sdks/python/apache_beam/utils/counters.py b/sdks/python/apache_beam/utils/counters.py index 5d029dcc03fa..2b1ddea4e483 100644 --- a/sdks/python/apache_beam/utils/counters.py +++ b/sdks/python/apache_beam/utils/counters.py @@ -24,9 +24,43 @@ """ import threading + from apache_beam.transforms import cy_combiners +class IOTargetName(object): + + def __init__(self, + side_input_step_name=None, + side_input_index=None, + original_shuffle_step_name=None): + self.side_input_step_name = side_input_step_name, + self.side_input_index = side_input_index + self.original_shuffle_step_name = original_shuffle_step_name + + @staticmethod + def side_input_id(step_name, input_index): + return IOTargetName(step_name, input_index) + + @staticmethod + def shuffle_id(step_name): + return IOTargetName(original_shuffle_step_name=step_name) + + def _tupl_internal(self): + return (self.side_input_step_name, + self.side_input_index, + self.original_shuffle_step_name) + + def __hash__(self): + return hash(self._tupl_internal()) + + def __eq__(self, other): + return self._tupl_internal() == other._tupl_internal() + + def __ne__(self, other): + return not self._tupl_internal() == other._tupl_internal() + + class CounterName(object): """Naming information for a counter.""" SYSTEM = object() @@ -34,7 +68,7 @@ class CounterName(object): def __init__(self, name, stage_name=None, step_name=None, system_name=None, namespace=None, - origin=None, output_index=None): + origin=None, output_index=None, io_target=None): self.name = name self.origin = origin or CounterName.SYSTEM self.namespace = namespace @@ -42,21 +76,32 @@ def __init__(self, name, stage_name=None, step_name=None, self.step_name = step_name self.system_name = system_name self.output_index = output_index + self.io_target = io_target + + def __eq__(self, other): + return self._tupl_internal() == other._tupl_internal() + + def __ne__(self, other): + return not self == other def __hash__(self): - return hash((self.name, - self.origin, - self.namespace, - self.stage_name, - self.step_name, - self.system_name, - self.output_index)) + return hash(self._tupl_internal()) + + def _tupl_internal(self): + return (self.name, + self.origin, + self.namespace, + self.stage_name, + self.step_name, + self.system_name, + self.output_index, + self.io_target) def __str__(self): return '%s' % self._str_internal() def __repr__(self): - return '<%s at %s>' % (self._str_internal(), hex(id(self))) + return ' at %s>' % (self._str_internal(), hex(id(self))) def _str_internal(self): if self.origin == CounterName.USER: diff --git a/sdks/python/apache_beam/utils/counters_test.py b/sdks/python/apache_beam/utils/counters_test.py new file mode 100644 index 000000000000..41ebcd2ba642 --- /dev/null +++ b/sdks/python/apache_beam/utils/counters_test.py @@ -0,0 +1,88 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +"""Unit tests for counters and counter names.""" + +from __future__ import absolute_import + +import unittest + +from apache_beam.utils.counters import CounterName +from apache_beam.utils.counters import IOTargetName + + +class CounterNameTest(unittest.TestCase): + + def test_find_counter(self): + c1 = object() + cn1 = CounterName('cname1', 'stage1', 'step1') + c2 = object() + cn2 = CounterName('cname2', 'stage2', 'step2') + + index = {cn1: c1, cn2: c2} + self.assertEqual(c1, index[cn1]) + self.assertEqual(c2, index[cn2]) + + def test_iotarget_counter_names(self): + self.assertEqual(CounterName('counter_name', + 'stage_name', + 'step_name', + io_target=IOTargetName(1, 's9')), + CounterName('counter_name', + 'stage_name', + 'step_name', + io_target=IOTargetName(1, 's9'))) + self.assertNotEqual(CounterName('counter_name', + 'stage_name', + 'step_name', + io_target=IOTargetName(1, 's')), + CounterName('counter_name', + 'stage_name', + 'step_name', + io_target=IOTargetName(1, 's9'))) + + def test_equal_objects(self): + self.assertEqual(CounterName('counter_name', + 'stage_name', + 'step_name'), + CounterName('counter_name', + 'stage_name', + 'step_name')) + self.assertNotEqual(CounterName('counter_name', + 'stage_name', + 'step_name'), + CounterName('counter_name', + 'stage_name', + 'step_nam')) + + def test_hash_two_objects(self): + self.assertEqual(hash(CounterName('counter_name', + 'stage_name', + 'step_name')), + hash(CounterName('counter_name', + 'stage_name', + 'step_name'))) + self.assertNotEqual(hash(CounterName('counter_name', + 'stage_name', + 'step_name')), + hash(CounterName('counter_name', + 'stage_name', + 'step_nam'))) + + +if __name__ == '__main__': + unittest.main() From f2112e6bb12a09ff3d0febdcab456581cc351bce Mon Sep 17 00:00:00 2001 From: Pablo Date: Wed, 23 Aug 2017 10:14:28 -0700 Subject: [PATCH 2/4] Addressing comments --- sdks/python/apache_beam/utils/counters.py | 38 +++++------------ .../python/apache_beam/utils/counters_test.py | 41 ++++++++----------- 2 files changed, 27 insertions(+), 52 deletions(-) diff --git a/sdks/python/apache_beam/utils/counters.py b/sdks/python/apache_beam/utils/counters.py index 2b1ddea4e483..4a9beba6efc2 100644 --- a/sdks/python/apache_beam/utils/counters.py +++ b/sdks/python/apache_beam/utils/counters.py @@ -23,42 +23,26 @@ For internal use only; no backwards-compatibility guarantees. """ +from collections import namedtuple import threading from apache_beam.transforms import cy_combiners -class IOTargetName(object): +"""Information identifying the IO being measured by a counter.""" +IOTargetName = namedtuple('IOTargetName', ['side_input_step_name', + 'side_input_index', + 'original_shuffle_step_name']) - def __init__(self, - side_input_step_name=None, - side_input_index=None, - original_shuffle_step_name=None): - self.side_input_step_name = side_input_step_name, - self.side_input_index = side_input_index - self.original_shuffle_step_name = original_shuffle_step_name - @staticmethod - def side_input_id(step_name, input_index): - return IOTargetName(step_name, input_index) +def side_input_id(step_name, input_index): + """Create an IOTargetName that identifies the reading of a side input.""" + return IOTargetName(step_name, input_index, None) - @staticmethod - def shuffle_id(step_name): - return IOTargetName(original_shuffle_step_name=step_name) - def _tupl_internal(self): - return (self.side_input_step_name, - self.side_input_index, - self.original_shuffle_step_name) - - def __hash__(self): - return hash(self._tupl_internal()) - - def __eq__(self, other): - return self._tupl_internal() == other._tupl_internal() - - def __ne__(self, other): - return not self._tupl_internal() == other._tupl_internal() +def shuffle_id(step_name): + """Create an IOTargetName that identifies a GBK step.""" + return IOTargetName(None, None, step_name) class CounterName(object): diff --git a/sdks/python/apache_beam/utils/counters_test.py b/sdks/python/apache_beam/utils/counters_test.py index 41ebcd2ba642..3f9da9a75e95 100644 --- a/sdks/python/apache_beam/utils/counters_test.py +++ b/sdks/python/apache_beam/utils/counters_test.py @@ -21,53 +21,44 @@ import unittest +from apache_beam.utils import counters from apache_beam.utils.counters import CounterName -from apache_beam.utils.counters import IOTargetName class CounterNameTest(unittest.TestCase): - def test_find_counter(self): - c1 = object() - cn1 = CounterName('cname1', 'stage1', 'step1') - c2 = object() - cn2 = CounterName('cname2', 'stage2', 'step2') - - index = {cn1: c1, cn2: c2} - self.assertEqual(c1, index[cn1]) - self.assertEqual(c2, index[cn2]) - - def test_iotarget_counter_names(self): + def test_equal_objects(self): self.assertEqual(CounterName('counter_name', 'stage_name', - 'step_name', - io_target=IOTargetName(1, 's9')), + 'step_name'), CounterName('counter_name', 'stage_name', - 'step_name', - io_target=IOTargetName(1, 's9'))) + 'step_name')) self.assertNotEqual(CounterName('counter_name', 'stage_name', - 'step_name', - io_target=IOTargetName(1, 's')), + 'step_name'), CounterName('counter_name', 'stage_name', - 'step_name', - io_target=IOTargetName(1, 's9'))) + 'step_nam')) - def test_equal_objects(self): + # Testing objects with an IOTarget. self.assertEqual(CounterName('counter_name', 'stage_name', - 'step_name'), + 'step_name', + io_target=counters.side_input_id(1, 's9')), CounterName('counter_name', 'stage_name', - 'step_name')) + 'step_name', + io_target=counters.side_input_id(1, 's9'))) self.assertNotEqual(CounterName('counter_name', 'stage_name', - 'step_name'), + 'step_name', + io_target=counters.side_input_id(1, 's')), CounterName('counter_name', 'stage_name', - 'step_nam')) + 'step_name', + io_target=counters.side_input_id(1, 's9'))) + def test_hash_two_objects(self): self.assertEqual(hash(CounterName('counter_name', From a911331b4b2c5e6e67505fe9da52c1d6114f7a9a Mon Sep 17 00:00:00 2001 From: Pablo Date: Wed, 23 Aug 2017 12:56:57 -0700 Subject: [PATCH 3/4] Namedtuple CounterName --- sdks/python/apache_beam/utils/counters.py | 50 ++++++++--------------- 1 file changed, 18 insertions(+), 32 deletions(-) diff --git a/sdks/python/apache_beam/utils/counters.py b/sdks/python/apache_beam/utils/counters.py index 4a9beba6efc2..2f21256814d7 100644 --- a/sdks/python/apache_beam/utils/counters.py +++ b/sdks/python/apache_beam/utils/counters.py @@ -29,7 +29,7 @@ from apache_beam.transforms import cy_combiners -"""Information identifying the IO being measured by a counter.""" +# Information identifying the IO being measured by a counter. IOTargetName = namedtuple('IOTargetName', ['side_input_step_name', 'side_input_index', 'original_shuffle_step_name']) @@ -45,41 +45,27 @@ def shuffle_id(step_name): return IOTargetName(None, None, step_name) -class CounterName(object): +_CounterName = namedtuple('_CounterName', ['name', + 'stage_name', + 'step_name', + 'system_name', + 'namespace', + 'origin', + 'output_index', + 'io_target']) + +class CounterName(_CounterName): """Naming information for a counter.""" SYSTEM = object() USER = object() - def __init__(self, name, stage_name=None, step_name=None, - system_name=None, namespace=None, - origin=None, output_index=None, io_target=None): - self.name = name - self.origin = origin or CounterName.SYSTEM - self.namespace = namespace - self.stage_name = stage_name - self.step_name = step_name - self.system_name = system_name - self.output_index = output_index - self.io_target = io_target - - def __eq__(self, other): - return self._tupl_internal() == other._tupl_internal() - - def __ne__(self, other): - return not self == other - - def __hash__(self): - return hash(self._tupl_internal()) - - def _tupl_internal(self): - return (self.name, - self.origin, - self.namespace, - self.stage_name, - self.step_name, - self.system_name, - self.output_index, - self.io_target) + def __new__(cls, name, stage_name=None, step_name=None, + system_name=None, namespace=None, + origin=None, output_index=None, io_target=None): + origin = origin or CounterName.SYSTEM + return super(CounterName, cls).__new__(cls, name, stage_name, step_name, + system_name, namespace, + origin, output_index, io_target) def __str__(self): return '%s' % self._str_internal() From 61c9bf917745b36f4c502251d303f66fcd5a19b3 Mon Sep 17 00:00:00 2001 From: Pablo Date: Wed, 23 Aug 2017 16:36:33 -0700 Subject: [PATCH 4/4] Fixing lint issue --- sdks/python/apache_beam/utils/counters.py | 1 + sdks/python/apache_beam/utils/counters_test.py | 1 - 2 files changed, 1 insertion(+), 1 deletion(-) diff --git a/sdks/python/apache_beam/utils/counters.py b/sdks/python/apache_beam/utils/counters.py index 2f21256814d7..08685aae2759 100644 --- a/sdks/python/apache_beam/utils/counters.py +++ b/sdks/python/apache_beam/utils/counters.py @@ -54,6 +54,7 @@ def shuffle_id(step_name): 'output_index', 'io_target']) + class CounterName(_CounterName): """Naming information for a counter.""" SYSTEM = object() diff --git a/sdks/python/apache_beam/utils/counters_test.py b/sdks/python/apache_beam/utils/counters_test.py index 3f9da9a75e95..37cab881d8b3 100644 --- a/sdks/python/apache_beam/utils/counters_test.py +++ b/sdks/python/apache_beam/utils/counters_test.py @@ -59,7 +59,6 @@ def test_equal_objects(self): 'step_name', io_target=counters.side_input_id(1, 's9'))) - def test_hash_two_objects(self): self.assertEqual(hash(CounterName('counter_name', 'stage_name',