From 1339474d461cc6deaebb4a7668a24e2a3e56af63 Mon Sep 17 00:00:00 2001 From: Pablo Date: Wed, 4 Oct 2017 15:55:28 -0700 Subject: [PATCH 1/2] Adding tracking for bytes and msecs spent while reading from side inputs --- .../apache_beam/runners/worker/opcounters.py | 54 +++++++++++++++++++ .../apache_beam/runners/worker/operations.py | 11 +++- .../apache_beam/runners/worker/sideinputs.py | 50 ++++++++++++++--- .../runners/worker/sideinputs_test.py | 7 +++ .../runners/worker/statesampler.pyx | 7 +++ .../runners/worker/statesampler_fake.py | 19 +++++-- 6 files changed, 135 insertions(+), 13 deletions(-) diff --git a/sdks/python/apache_beam/runners/worker/opcounters.py b/sdks/python/apache_beam/runners/worker/opcounters.py index f4ba6b9a9a88..8109af8ec120 100644 --- a/sdks/python/apache_beam/runners/worker/opcounters.py +++ b/sdks/python/apache_beam/runners/worker/opcounters.py @@ -21,10 +21,12 @@ from __future__ import absolute_import +import logging import math import random from apache_beam.utils.counters import Counter +from apache_beam.utils.counters import CounterName # This module is experimental. No backwards-compatibility guarantees. @@ -42,6 +44,58 @@ def value(self): return self._value +class TransformIoCounter(object): + + def add_bytes_read(self, n): + pass + + def __enter__(self): + self.enter() + + def __exit__(self, unused_exc_type, unused_exc_value, unused_traceback): + self.exit() + + def enter(self): + pass + + def exit(self): + pass + + def check_step(self): + pass + + +class SideInputReadCounter(TransformIoCounter): + + def __init__(self, counter_factory, state_sampler, io_target): + self._counter_factory = counter_factory + self._state_sampler = state_sampler + self._bytes_read_cache = 0 + self.io_target = io_target + self.check_step() + + def check_step(self): + current_state = self._state_sampler.current_state() + operation_name = current_state.name.step_name + self.scoped_state = self._state_sampler.scoped_state( + operation_name, 'read-sideinput', io_target=self.io_target) + self.bytes_read_counter = self._counter_factory.get_counter( + CounterName('bytes-read', + step_name=operation_name, + io_target=self.io_target), + Counter.SUM) + + def add_bytes_read(self, n): + if n > 0: + self.bytes_read_counter.update(n) + + def enter(self): + self.scoped_state.__enter__() + + def exit(self): + self.scoped_state.__exit__(None, None, None) + + class OperationCounters(object): """The set of basic counters to attach to an Operation.""" diff --git a/sdks/python/apache_beam/runners/worker/operations.py b/sdks/python/apache_beam/runners/worker/operations.py index ed3f3b8f466c..132a61fb131f 100644 --- a/sdks/python/apache_beam/runners/worker/operations.py +++ b/sdks/python/apache_beam/runners/worker/operations.py @@ -42,6 +42,7 @@ from apache_beam.transforms.combiners import PhasedCombineFnExecutor from apache_beam.transforms.combiners import curry_combine_fn from apache_beam.transforms.window import GlobalWindows +from apache_beam.utils import counters from apache_beam.utils.windowed_value import WindowedValue # Allow some "pure mode" declarations. @@ -281,7 +282,7 @@ def _read_side_inputs(self, tags_and_types): # Note that for each tag there could be several read operations in the # specification. This can happen for instance if the source has been # sharded into several files. - for side_tag, view_class, view_options in tags_and_types: + for i, (side_tag, view_class, view_options) in enumerate(tags_and_types): sources = [] # Using the side_tag in the lambda below will trigger a pylint warning. # However in this case it is fine because the lambda is used right away @@ -293,7 +294,13 @@ def _read_side_inputs(self, tags_and_types): if not isinstance(si, operation_specs.WorkerSideInputSource): raise NotImplementedError('Unknown side input type: %r' % si) sources.append(si.source) - iterator_fn = sideinputs.get_iterator_fn_for_sources(sources) + + si_counter = opcounters.SideInputReadCounter( + self.counter_factory, self.state_sampler, + # Inputs are 1-indexed, so we add 1 to i in the side input id + counters.side_input_id(self.operation_name, i+1)) + iterator_fn = sideinputs.get_iterator_fn_for_sources( + sources, read_counter=si_counter) # Backwards compatibility for pre BEAM-733 SDKs. if isinstance(view_options, tuple): diff --git a/sdks/python/apache_beam/runners/worker/sideinputs.py b/sdks/python/apache_beam/runners/worker/sideinputs.py index bdf9f4e71f5e..ec5117c55533 100644 --- a/sdks/python/apache_beam/runners/worker/sideinputs.py +++ b/sdks/python/apache_beam/runners/worker/sideinputs.py @@ -24,6 +24,7 @@ import traceback from apache_beam.io import iobase +from apache_beam.runners.worker import opcounters from apache_beam.transforms import window # This module is experimental. No backwards-compatibility guarantees. @@ -51,9 +52,12 @@ class PrefetchingSourceSetIterable(object): """Value iterator that reads concurrently from a set of sources.""" def __init__(self, sources, - max_reader_threads=MAX_SOURCE_READER_THREADS): + max_reader_threads=MAX_SOURCE_READER_THREADS, + read_counter=None): self.sources = sources self.num_reader_threads = min(max_reader_threads, len(self.sources)) + self.read_counter = read_counter or opcounters.TransformIoCounter() + # self.read_counter = opcounters.TransformIoCounter() # Queue for sources that are to be read. self.sources_queue = Queue.Queue() @@ -78,6 +82,14 @@ def _start_reader_threads(self): t.start() self.reader_threads.append(t) + def _get_source_position(self, range_tracker=None, reader=None): + if reader: + return reader.get_progress().position.byte_offset + else: + return range_tracker.position_at_fraction( + range_tracker.fraction_consumed()) if range_tracker else 0 + + def _reader_thread(self): # pylint: disable=too-many-nested-blocks try: @@ -85,22 +97,37 @@ def _reader_thread(self): try: source = self.sources_queue.get_nowait() if isinstance(source, iobase.BoundedSource): - for value in source.read(source.get_range_tracker(None, None)): + rt = source.get_range_tracker(None, None) + initial_position = self._get_source_position(range_tracker=rt) + for value in source.read(rt): if self.has_errored: # If any reader has errored, just return. return + + current_position = self._get_source_position(range_tracker=rt) + consumed_bytes = current_position - initial_position + self.read_counter.add_bytes_read(consumed_bytes) + initial_position = initial_position + consumed_bytes + if isinstance(value, window.WindowedValue): self.element_queue.put(value) else: self.element_queue.put(_globally_windowed(value)) else: - # Native dataflow source. + # Native dataflow source / testing FakeSource with source.reader() as reader: + initial_offset = self._get_source_position(reader=reader) + returns_windowed_values = reader.returns_windowed_values for value in reader: if self.has_errored: - # If any reader has errored, just return. + # If any reader has errored, just return.` return + + new_offset = self._get_source_position(reader=reader) + self.read_counter.add_bytes_read(new_offset - initial_offset) + initial_offset = new_offset + if returns_windowed_values: self.element_queue.put(value) else: @@ -128,7 +155,14 @@ def __iter__(self): num_readers_finished = 0 try: while True: - element = self.element_queue.get() + if self.element_queue.empty(): + # The queue is empty. We check the current state. + self.read_counter.check_step() + with self.read_counter: + element = self.element_queue.get() + else: + element = self.element_queue.get() + if element is READER_THREAD_IS_DONE_SENTINEL: num_readers_finished += 1 if num_readers_finished == self.num_reader_threads: @@ -150,11 +184,13 @@ def __iter__(self): def get_iterator_fn_for_sources( - sources, max_reader_threads=MAX_SOURCE_READER_THREADS): + sources, max_reader_threads=MAX_SOURCE_READER_THREADS, read_counter=None): """Returns callable that returns iterator over elements for given sources.""" def _inner(): return iter(PrefetchingSourceSetIterable( - sources, max_reader_threads=max_reader_threads)) + sources, + max_reader_threads=max_reader_threads, + read_counter=read_counter)) return _inner diff --git a/sdks/python/apache_beam/runners/worker/sideinputs_test.py b/sdks/python/apache_beam/runners/worker/sideinputs_test.py index d243bbe4e6ee..296bed460d14 100644 --- a/sdks/python/apache_beam/runners/worker/sideinputs_test.py +++ b/sdks/python/apache_beam/runners/worker/sideinputs_test.py @@ -21,6 +21,8 @@ import time import unittest +import mock + from apache_beam.runners.worker import sideinputs @@ -43,6 +45,11 @@ def __init__(self, items): self.items = items self.entered = False self.exited = False + self._progress = mock.MagicMock() + self._progress.position.byte_offset = 0 + + def get_progress(self): + return self._progress def __iter__(self): return iter(self.items) diff --git a/sdks/python/apache_beam/runners/worker/statesampler.pyx b/sdks/python/apache_beam/runners/worker/statesampler.pyx index f0527c6decc7..2c3e3784f7b7 100644 --- a/sdks/python/apache_beam/runners/worker/statesampler.pyx +++ b/sdks/python/apache_beam/runners/worker/statesampler.pyx @@ -174,6 +174,13 @@ cdef class StateSampler(object): # pythread doesn't support conditions. self.sampling_thread.join() + def current_state(self): + """Returns the current ScopedState. + + This operation is not thread safe, and should only be used to check, not to + update information in the current state.""" + return self.scoped_states_by_index[self.current_state_index] + def stop_if_still_running(self): if self.started and not self.finished: self.stop() diff --git a/sdks/python/apache_beam/runners/worker/statesampler_fake.py b/sdks/python/apache_beam/runners/worker/statesampler_fake.py index bc56021520aa..a106e4378595 100644 --- a/sdks/python/apache_beam/runners/worker/statesampler_fake.py +++ b/sdks/python/apache_beam/runners/worker/statesampler_fake.py @@ -17,14 +17,19 @@ # This module is experimental. No backwards-compatibility guarantees. +from apache_beam.utils.counters import CounterName + class StateSampler(object): def __init__(self, *args, **kwargs): - pass + self._current_state = _FakeScopedState(self, 'unknown', 'unknown') def scoped_state(self, step_name, state_name=None, io_target=None): - return _FakeScopedState() + return _FakeScopedState(self, step_name, state_name) + + def current_state(self): + return self._current_state def start(self): pass @@ -41,11 +46,17 @@ def commit_counters(self): class _FakeScopedState(object): + def __init__(self, sampler, step_name, state_name): + self.name = CounterName(state_name + '-msecs', + step_name=step_name) + self.sampler = sampler + def __enter__(self): - pass + self.old_state = self.sampler.current_state() + self.sampler._current_state = self def __exit__(self, *unused_args): - pass + self.sampler._current_state = self.old_state def sampled_seconds(self): return 0 From 902ca7fbe38815e260902460b4cbed30ae83a354 Mon Sep 17 00:00:00 2001 From: Pablo Date: Thu, 5 Oct 2017 10:06:29 -0700 Subject: [PATCH 2/2] Fixing lint issues --- sdks/python/apache_beam/runners/worker/opcounters.py | 1 - sdks/python/apache_beam/runners/worker/sideinputs.py | 3 +-- 2 files changed, 1 insertion(+), 3 deletions(-) diff --git a/sdks/python/apache_beam/runners/worker/opcounters.py b/sdks/python/apache_beam/runners/worker/opcounters.py index 8109af8ec120..c997d23a39d1 100644 --- a/sdks/python/apache_beam/runners/worker/opcounters.py +++ b/sdks/python/apache_beam/runners/worker/opcounters.py @@ -21,7 +21,6 @@ from __future__ import absolute_import -import logging import math import random diff --git a/sdks/python/apache_beam/runners/worker/sideinputs.py b/sdks/python/apache_beam/runners/worker/sideinputs.py index ec5117c55533..b11ab3cfba65 100644 --- a/sdks/python/apache_beam/runners/worker/sideinputs.py +++ b/sdks/python/apache_beam/runners/worker/sideinputs.py @@ -89,7 +89,6 @@ def _get_source_position(self, range_tracker=None, reader=None): return range_tracker.position_at_fraction( range_tracker.fraction_consumed()) if range_tracker else 0 - def _reader_thread(self): # pylint: disable=too-many-nested-blocks try: @@ -105,7 +104,7 @@ def _reader_thread(self): return current_position = self._get_source_position(range_tracker=rt) - consumed_bytes = current_position - initial_position + consumed_bytes = current_position - initial_position self.read_counter.add_bytes_read(consumed_bytes) initial_position = initial_position + consumed_bytes