From efa2b7156ebc9212e995a21522458266ec26ed91 Mon Sep 17 00:00:00 2001 From: Pablo Date: Tue, 29 Aug 2017 15:23:20 -0700 Subject: [PATCH 1/7] Preparing statesampler to work with structured names --- .../portability/maptask_executor_runner.py | 2 +- .../runners/worker/bundle_processor.py | 2 +- .../apache_beam/runners/worker/operations.py | 6 ++-- .../runners/worker/statesampler.pyx | 31 ++++++++++++++----- .../runners/worker/statesampler_fake.py | 2 +- .../runners/worker/statesampler_test.py | 2 +- 6 files changed, 30 insertions(+), 15 deletions(-) diff --git a/sdks/python/apache_beam/runners/portability/maptask_executor_runner.py b/sdks/python/apache_beam/runners/portability/maptask_executor_runner.py index a20ceef60fae0..afb96fa09e9d6 100644 --- a/sdks/python/apache_beam/runners/portability/maptask_executor_runner.py +++ b/sdks/python/apache_beam/runners/portability/maptask_executor_runner.py @@ -130,7 +130,7 @@ def execute_map_tasks(self, ordered_map_tasks): # Create the CounterFactory and StateSampler for this MapTask. # TODO(robertwb): Output counters produced here are currently ignored. counter_factory = CounterFactory() - state_sampler = statesampler.StateSampler('%s-' % ix, counter_factory) + state_sampler = statesampler.StateSampler('%s' % ix, counter_factory) map_executor = operations.SimpleMapTaskExecutor( operation_specs.MapTask( all_operations, 'S%02d' % ix, diff --git a/sdks/python/apache_beam/runners/worker/bundle_processor.py b/sdks/python/apache_beam/runners/worker/bundle_processor.py index 068aa0af2b049..b69d0027fc6ee 100644 --- a/sdks/python/apache_beam/runners/worker/bundle_processor.py +++ b/sdks/python/apache_beam/runners/worker/bundle_processor.py @@ -193,7 +193,7 @@ def create_execution_tree(self, descriptor): # from StateSampler. counter_factory = counters.CounterFactory() state_sampler = statesampler.StateSampler( - 'fnapi-step%s-' % descriptor.id, counter_factory) + 'fnapi-step%s' % descriptor.id, counter_factory) transform_factory = BeamTransformFactory( descriptor, self.data_channel_factory, counter_factory, state_sampler, diff --git a/sdks/python/apache_beam/runners/worker/operations.py b/sdks/python/apache_beam/runners/worker/operations.py index 1b61f8e5f9beb..1136d99d6f818 100644 --- a/sdks/python/apache_beam/runners/worker/operations.py +++ b/sdks/python/apache_beam/runners/worker/operations.py @@ -122,11 +122,11 @@ def __init__(self, operation_name, spec, counter_factory, state_sampler): self.state_sampler = state_sampler self.scoped_start_state = self.state_sampler.scoped_state( - self.operation_name + '-start') + self.operation_name, 'start') self.scoped_process_state = self.state_sampler.scoped_state( - self.operation_name + '-process') + self.operation_name, 'process') self.scoped_finish_state = self.state_sampler.scoped_state( - self.operation_name + '-finish') + self.operation_name, 'finish') # TODO(ccy): the '-abort' state can be added when the abort is supported in # Operations. self.scoped_metrics_container = None diff --git a/sdks/python/apache_beam/runners/worker/statesampler.pyx b/sdks/python/apache_beam/runners/worker/statesampler.pyx index 3ff6c20aa6902..c7876f9bc4014 100644 --- a/sdks/python/apache_beam/runners/worker/statesampler.pyx +++ b/sdks/python/apache_beam/runners/worker/statesampler.pyx @@ -40,12 +40,13 @@ import time from apache_beam.utils.counters import Counter - +from apache_beam.utils.counters import CounterName cimport cython from cpython cimport pythread from libc.stdint cimport int32_t, int64_t + cdef extern from "Python.h": # This typically requires the GIL, but we synchronize the list modifications # we use this on via our own lock. @@ -110,7 +111,9 @@ cdef class StateSampler(object): def __init__(self, prefix, counter_factory, sampling_period_ms=DEFAULT_SAMPLING_PERIOD_MS): - self.prefix = prefix + # TODO(pabloem) - Remove this once the worker has been rebuilt(by 17/10/1). + # We stop using prefixes with included dash. + self.prefix = prefix[:-1] if prefix[-1] == '-' else prefix self.counter_factory = counter_factory self.sampling_period_ms = sampling_period_ms @@ -180,21 +183,33 @@ cdef class StateSampler(object): self.scoped_states_by_index[self.current_state_index].name, self.state_transition_count) - def scoped_state(self, name): + def scoped_state(self, step_name, state_name=None): """Returns a context manager managing transitions for a given state.""" - cdef ScopedState scoped_state = self.scoped_states_by_name.get(name, None) + cdef ScopedState scoped_state + if state_name is None: + # If state_name is None, the worker is still using old style + # msec counters. + counter_name = '%s-%s-msecs' % (self.prefix, step_name) + scoped_state = self.scoped_states_by_name.get(step_name, None) + else: + counter_name = CounterName(state_name+'-msecs', + stage_name=self.prefix, + step_name=step_name) + scoped_state = self.scoped_states_by_name.get(counter_name, None) + if scoped_state is None: - output_counter = self.counter_factory.get_counter( - '%s%s-msecs' % (self.prefix, name), Counter.SUM) + output_counter = self.counter_factory.get_counter(counter_name, + Counter.SUM) new_state_index = len(self.scoped_states_by_index) - scoped_state = ScopedState(self, name, new_state_index, output_counter) + scoped_state = ScopedState(self, counter_name, + new_state_index, output_counter) # Both scoped_states_by_index and scoped_state.nsecs are accessed # by the sampling thread; initialize them under the lock. pythread.PyThread_acquire_lock(self.lock, pythread.WAIT_LOCK) self.scoped_states_by_index.append(scoped_state) scoped_state.nsecs = 0 pythread.PyThread_release_lock(self.lock) - self.scoped_states_by_name[name] = scoped_state + self.scoped_states_by_name[counter_name] = scoped_state return scoped_state def commit_counters(self): diff --git a/sdks/python/apache_beam/runners/worker/statesampler_fake.py b/sdks/python/apache_beam/runners/worker/statesampler_fake.py index 88ace8c5ae890..38bb8ac554de3 100644 --- a/sdks/python/apache_beam/runners/worker/statesampler_fake.py +++ b/sdks/python/apache_beam/runners/worker/statesampler_fake.py @@ -23,7 +23,7 @@ class StateSampler(object): def __init__(self, *args, **kwargs): pass - def scoped_state(self, name): + def scoped_state(self, step_name, state_name=None): return _FakeScopedState() diff --git a/sdks/python/apache_beam/runners/worker/statesampler_test.py b/sdks/python/apache_beam/runners/worker/statesampler_test.py index 663cdecdab0c5..2a856104a3184 100644 --- a/sdks/python/apache_beam/runners/worker/statesampler_test.py +++ b/sdks/python/apache_beam/runners/worker/statesampler_test.py @@ -40,7 +40,7 @@ def setUp(self): def test_basic_sampler(self): # Set up state sampler. counter_factory = CounterFactory() - sampler = statesampler.StateSampler('basic-', counter_factory, + sampler = statesampler.StateSampler('basic', counter_factory, sampling_period_ms=1) # Run basic workload transitioning between 3 states. From 1058fcdc58e8a27dca1745c148972e776333811e Mon Sep 17 00:00:00 2001 From: Pablo Date: Thu, 21 Sep 2017 08:57:00 -0700 Subject: [PATCH 2/7] Support iotarget for ssampler --- sdks/python/apache_beam/runners/worker/statesampler.pyx | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/sdks/python/apache_beam/runners/worker/statesampler.pyx b/sdks/python/apache_beam/runners/worker/statesampler.pyx index c7876f9bc4014..ddba5f813636a 100644 --- a/sdks/python/apache_beam/runners/worker/statesampler.pyx +++ b/sdks/python/apache_beam/runners/worker/statesampler.pyx @@ -183,7 +183,7 @@ cdef class StateSampler(object): self.scoped_states_by_index[self.current_state_index].name, self.state_transition_count) - def scoped_state(self, step_name, state_name=None): + def scoped_state(self, step_name, state_name=None, io_target=None): """Returns a context manager managing transitions for a given state.""" cdef ScopedState scoped_state if state_name is None: @@ -194,7 +194,8 @@ cdef class StateSampler(object): else: counter_name = CounterName(state_name+'-msecs', stage_name=self.prefix, - step_name=step_name) + step_name=step_name, + io_target=io_target) scoped_state = self.scoped_states_by_name.get(counter_name, None) if scoped_state is None: From 91e7c93555a5bfad6bcecf6c913becbd66165901 Mon Sep 17 00:00:00 2001 From: Pablo Date: Thu, 21 Sep 2017 10:53:57 -0700 Subject: [PATCH 3/7] Improving documentation --- .../apache_beam/runners/worker/statesampler.pyx | 14 ++++++++++++-- 1 file changed, 12 insertions(+), 2 deletions(-) diff --git a/sdks/python/apache_beam/runners/worker/statesampler.pyx b/sdks/python/apache_beam/runners/worker/statesampler.pyx index ddba5f813636a..c061f503ebd36 100644 --- a/sdks/python/apache_beam/runners/worker/statesampler.pyx +++ b/sdks/python/apache_beam/runners/worker/statesampler.pyx @@ -111,7 +111,8 @@ cdef class StateSampler(object): def __init__(self, prefix, counter_factory, sampling_period_ms=DEFAULT_SAMPLING_PERIOD_MS): - # TODO(pabloem) - Remove this once the worker has been rebuilt(by 17/10/1). + # TODO(pabloem) - Remove this once all dashed prefixes are removed from + # the worker. # We stop using prefixes with included dash. self.prefix = prefix[:-1] if prefix[-1] == '-' else prefix self.counter_factory = counter_factory @@ -184,7 +185,16 @@ cdef class StateSampler(object): self.state_transition_count) def scoped_state(self, step_name, state_name=None, io_target=None): - """Returns a context manager managing transitions for a given state.""" + """Returns a context manager managing transitions for a given state. + Args: + step_name: A string with the name of the running step. + state_name: A string with the name of the state (e.g. 'process', 'start') + io_target: An IOTargetName object describing the io_target (e.g. writing + or reading to side inputs, shuffle or state). Will often be None. + + Returns: + A ScopedState for the set of step-state-io_target. + """ cdef ScopedState scoped_state if state_name is None: # If state_name is None, the worker is still using old style From 202202e40d97f69345de2ddb0f77066bc68dbf4b Mon Sep 17 00:00:00 2001 From: Pablo Date: Mon, 25 Sep 2017 10:59:33 -0700 Subject: [PATCH 4/7] Fix missing arg --- sdks/python/apache_beam/runners/worker/statesampler_fake.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sdks/python/apache_beam/runners/worker/statesampler_fake.py b/sdks/python/apache_beam/runners/worker/statesampler_fake.py index 38bb8ac554de3..5cd0fd25d511d 100644 --- a/sdks/python/apache_beam/runners/worker/statesampler_fake.py +++ b/sdks/python/apache_beam/runners/worker/statesampler_fake.py @@ -23,7 +23,7 @@ class StateSampler(object): def __init__(self, *args, **kwargs): pass - def scoped_state(self, step_name, state_name=None): + def scoped_state(self, step_name, state_name=None, io_target=None): return _FakeScopedState() From 9cf96ab6a8360cbda9b472a255a547fb246d09ff Mon Sep 17 00:00:00 2001 From: Pablo Date: Mon, 25 Sep 2017 15:47:21 -0700 Subject: [PATCH 5/7] Addressing comments --- sdks/python/apache_beam/runners/worker/statesampler.pyx | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/sdks/python/apache_beam/runners/worker/statesampler.pyx b/sdks/python/apache_beam/runners/worker/statesampler.pyx index c061f503ebd36..9a228b1c25437 100644 --- a/sdks/python/apache_beam/runners/worker/statesampler.pyx +++ b/sdks/python/apache_beam/runners/worker/statesampler.pyx @@ -184,6 +184,8 @@ cdef class StateSampler(object): self.scoped_states_by_index[self.current_state_index].name, self.state_transition_count) + # TODO(pabloem) - Make state_name required once all callers migrate, + # and the legacy path is removed. def scoped_state(self, step_name, state_name=None, io_target=None): """Returns a context manager managing transitions for a given state. Args: @@ -200,9 +202,9 @@ cdef class StateSampler(object): # If state_name is None, the worker is still using old style # msec counters. counter_name = '%s-%s-msecs' % (self.prefix, step_name) - scoped_state = self.scoped_states_by_name.get(step_name, None) + scoped_state = self.scoped_states_by_name.get(counter_name, None) else: - counter_name = CounterName(state_name+'-msecs', + counter_name = CounterName(state_name + '-msecs', stage_name=self.prefix, step_name=step_name, io_target=io_target) From 8bdb6c617836250f1e5244f2c122355e7fe56826 Mon Sep 17 00:00:00 2001 From: Pablo Date: Mon, 25 Sep 2017 16:13:54 -0700 Subject: [PATCH 6/7] Fix nit --- sdks/python/apache_beam/runners/worker/statesampler.pyx | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/sdks/python/apache_beam/runners/worker/statesampler.pyx b/sdks/python/apache_beam/runners/worker/statesampler.pyx index 9a228b1c25437..5c5d0641bd615 100644 --- a/sdks/python/apache_beam/runners/worker/statesampler.pyx +++ b/sdks/python/apache_beam/runners/worker/statesampler.pyx @@ -111,7 +111,7 @@ cdef class StateSampler(object): def __init__(self, prefix, counter_factory, sampling_period_ms=DEFAULT_SAMPLING_PERIOD_MS): - # TODO(pabloem) - Remove this once all dashed prefixes are removed from + # TODO(pabloem): Remove this once all dashed prefixes are removed from # the worker. # We stop using prefixes with included dash. self.prefix = prefix[:-1] if prefix[-1] == '-' else prefix @@ -184,7 +184,7 @@ cdef class StateSampler(object): self.scoped_states_by_index[self.current_state_index].name, self.state_transition_count) - # TODO(pabloem) - Make state_name required once all callers migrate, + # TODO(pabloem): Make state_name required once all callers migrate, # and the legacy path is removed. def scoped_state(self, step_name, state_name=None, io_target=None): """Returns a context manager managing transitions for a given state. @@ -204,7 +204,7 @@ cdef class StateSampler(object): counter_name = '%s-%s-msecs' % (self.prefix, step_name) scoped_state = self.scoped_states_by_name.get(counter_name, None) else: - counter_name = CounterName(state_name + '-msecs', + counter_name = CounterName(state_name + 6'-msecs', stage_name=self.prefix, step_name=step_name, io_target=io_target) From 29829fe887d17d8c029099db2226e70ad7fa31a8 Mon Sep 17 00:00:00 2001 From: Pablo Date: Tue, 26 Sep 2017 10:07:18 -0700 Subject: [PATCH 7/7] Fix typo --- sdks/python/apache_beam/runners/worker/statesampler.pyx | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sdks/python/apache_beam/runners/worker/statesampler.pyx b/sdks/python/apache_beam/runners/worker/statesampler.pyx index 5c5d0641bd615..c56276303b376 100644 --- a/sdks/python/apache_beam/runners/worker/statesampler.pyx +++ b/sdks/python/apache_beam/runners/worker/statesampler.pyx @@ -204,7 +204,7 @@ cdef class StateSampler(object): counter_name = '%s-%s-msecs' % (self.prefix, step_name) scoped_state = self.scoped_states_by_name.get(counter_name, None) else: - counter_name = CounterName(state_name + 6'-msecs', + counter_name = CounterName(state_name + '-msecs', stage_name=self.prefix, step_name=step_name, io_target=io_target)