Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[BEAM-3042] Add tracking of bytes read / time spent when reading side inputs #3943

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
53 changes: 53 additions & 0 deletions sdks/python/apache_beam/runners/worker/opcounters.py
Expand Up @@ -25,6 +25,7 @@
import random

from apache_beam.utils.counters import Counter
from apache_beam.utils.counters import CounterName

# This module is experimental. No backwards-compatibility guarantees.

Expand All @@ -42,6 +43,58 @@ def value(self):
return self._value


class TransformIoCounter(object):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you add documentation ? Also is this a user-facing interface ? If so this should be discussed more broadly.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This won't be a user-facing interface. It's meant to be used by the IO infrastructure classes. I'll add documentation in a bit.


def add_bytes_read(self, n):
pass

def __enter__(self):
self.enter()

def __exit__(self, unused_exc_type, unused_exc_value, unused_traceback):
self.exit()

def enter(self):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are these needed in addition to enter and exit?

pass

def exit(self):
pass

def check_step(self):
pass


class SideInputReadCounter(TransformIoCounter):

def __init__(self, counter_factory, state_sampler, io_target):
self._counter_factory = counter_factory
self._state_sampler = state_sampler
self._bytes_read_cache = 0
self.io_target = io_target
self.check_step()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What does this do? Documentation on the parent class would be helpful even if it's not user-facing. The implementation below doesn't look like it's checking stuff.


def check_step(self):
current_state = self._state_sampler.current_state()
operation_name = current_state.name.step_name
self.scoped_state = self._state_sampler.scoped_state(
operation_name, 'read-sideinput', io_target=self.io_target)
self.bytes_read_counter = self._counter_factory.get_counter(
CounterName('bytes-read',
step_name=operation_name,
io_target=self.io_target),
Counter.SUM)

def add_bytes_read(self, n):
if n > 0:
self.bytes_read_counter.update(n)

def enter(self):
self.scoped_state.__enter__()

def exit(self):
self.scoped_state.__exit__(None, None, None)


class OperationCounters(object):
"""The set of basic counters to attach to an Operation."""

Expand Down
11 changes: 9 additions & 2 deletions sdks/python/apache_beam/runners/worker/operations.py
Expand Up @@ -42,6 +42,7 @@
from apache_beam.transforms.combiners import PhasedCombineFnExecutor
from apache_beam.transforms.combiners import curry_combine_fn
from apache_beam.transforms.window import GlobalWindows
from apache_beam.utils import counters
from apache_beam.utils.windowed_value import WindowedValue

# Allow some "pure mode" declarations.
Expand Down Expand Up @@ -281,7 +282,7 @@ def _read_side_inputs(self, tags_and_types):
# Note that for each tag there could be several read operations in the
# specification. This can happen for instance if the source has been
# sharded into several files.
for side_tag, view_class, view_options in tags_and_types:
for i, (side_tag, view_class, view_options) in enumerate(tags_and_types):
sources = []
# Using the side_tag in the lambda below will trigger a pylint warning.
# However in this case it is fine because the lambda is used right away
Expand All @@ -293,7 +294,13 @@ def _read_side_inputs(self, tags_and_types):
if not isinstance(si, operation_specs.WorkerSideInputSource):
raise NotImplementedError('Unknown side input type: %r' % si)
sources.append(si.source)
iterator_fn = sideinputs.get_iterator_fn_for_sources(sources)

si_counter = opcounters.SideInputReadCounter(
self.counter_factory, self.state_sampler,
# Inputs are 1-indexed, so we add 1 to i in the side input id
counters.side_input_id(self.operation_name, i+1))
iterator_fn = sideinputs.get_iterator_fn_for_sources(
sources, read_counter=si_counter)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we want to use the same counter for all the sources here ?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. A single side input may have different sources, but we want to track bytes/msecs for the side input, not per-source.


# Backwards compatibility for pre BEAM-733 SDKs.
if isinstance(view_options, tuple):
Expand Down
49 changes: 42 additions & 7 deletions sdks/python/apache_beam/runners/worker/sideinputs.py
Expand Up @@ -24,6 +24,7 @@
import traceback

from apache_beam.io import iobase
from apache_beam.runners.worker import opcounters
from apache_beam.transforms import window

# This module is experimental. No backwards-compatibility guarantees.
Expand Down Expand Up @@ -51,9 +52,12 @@ class PrefetchingSourceSetIterable(object):
"""Value iterator that reads concurrently from a set of sources."""

def __init__(self, sources,
max_reader_threads=MAX_SOURCE_READER_THREADS):
max_reader_threads=MAX_SOURCE_READER_THREADS,
read_counter=None):
self.sources = sources
self.num_reader_threads = min(max_reader_threads, len(self.sources))
self.read_counter = read_counter or opcounters.TransformIoCounter()
# self.read_counter = opcounters.TransformIoCounter()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is this commented out ?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is to allow to test a no-op counter vs the implementation. It will be removed before merging.


# Queue for sources that are to be read.
self.sources_queue = Queue.Queue()
Expand All @@ -78,29 +82,51 @@ def _start_reader_threads(self):
t.start()
self.reader_threads.append(t)

def _get_source_position(self, range_tracker=None, reader=None):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

when does this a get range_tracker vs reader ? Please clarify with a comment.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1. Is there a way to avoid this either/or altogether?

if reader:
return reader.get_progress().position.byte_offset
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What about side input sources that do not have byte offsets ?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I need Charles or Robert to input here. In my understanding, side inputs always use Avro files - so they are always byte-offset-based sources. Would this be correct? @charlesccychen

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think they might now, but we should verify in the case one uses the result of a read (e.g. Create or ReadTextIO) directly as a side input.

Even if it is the case, best to assert this assumption explicitly somewhere.

else:
return range_tracker.position_at_fraction(
range_tracker.fraction_consumed()) if range_tracker else 0
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it valid to return

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry forgot to complete this.

Should be: Is it valid to return 0 here should that case be an error ?


def _reader_thread(self):
# pylint: disable=too-many-nested-blocks
try:
while True:
try:
source = self.sources_queue.get_nowait()
if isinstance(source, iobase.BoundedSource):
for value in source.read(source.get_range_tracker(None, None)):
rt = source.get_range_tracker(None, None)
initial_position = self._get_source_position(range_tracker=rt)
for value in source.read(rt):
if self.has_errored:
# If any reader has errored, just return.
return

current_position = self._get_source_position(range_tracker=rt)
consumed_bytes = current_position - initial_position
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we just assume position is in bytes, and can be subtracted?

self.read_counter.add_bytes_read(consumed_bytes)
initial_position = initial_position + consumed_bytes

if isinstance(value, window.WindowedValue):
self.element_queue.put(value)
else:
self.element_queue.put(_globally_windowed(value))
else:
# Native dataflow source.
# Native dataflow source / testing FakeSource
with source.reader() as reader:
initial_offset = self._get_source_position(reader=reader)

returns_windowed_values = reader.returns_windowed_values
for value in reader:
if self.has_errored:
# If any reader has errored, just return.
# If any reader has errored, just return.`
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Extra backtick.

return

new_offset = self._get_source_position(reader=reader)
self.read_counter.add_bytes_read(new_offset - initial_offset)
initial_offset = new_offset

if returns_windowed_values:
self.element_queue.put(value)
else:
Expand Down Expand Up @@ -128,7 +154,14 @@ def __iter__(self):
num_readers_finished = 0
try:
while True:
element = self.element_queue.get()
if self.element_queue.empty():
# The queue is empty. We check the current state.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not following the relationship here.

self.read_counter.check_step()
with self.read_counter:
element = self.element_queue.get()
else:
element = self.element_queue.get()

if element is READER_THREAD_IS_DONE_SENTINEL:
num_readers_finished += 1
if num_readers_finished == self.num_reader_threads:
Expand All @@ -150,11 +183,13 @@ def __iter__(self):


def get_iterator_fn_for_sources(
sources, max_reader_threads=MAX_SOURCE_READER_THREADS):
sources, max_reader_threads=MAX_SOURCE_READER_THREADS, read_counter=None):
"""Returns callable that returns iterator over elements for given sources."""
def _inner():
return iter(PrefetchingSourceSetIterable(
sources, max_reader_threads=max_reader_threads))
sources,
max_reader_threads=max_reader_threads,
read_counter=read_counter))
return _inner


Expand Down
7 changes: 7 additions & 0 deletions sdks/python/apache_beam/runners/worker/sideinputs_test.py
Expand Up @@ -21,6 +21,8 @@
import time
import unittest

import mock

from apache_beam.runners.worker import sideinputs


Expand All @@ -43,6 +45,11 @@ def __init__(self, items):
self.items = items
self.entered = False
self.exited = False
self._progress = mock.MagicMock()
self._progress.position.byte_offset = 0

def get_progress(self):
return self._progress

def __iter__(self):
return iter(self.items)
Expand Down
7 changes: 7 additions & 0 deletions sdks/python/apache_beam/runners/worker/statesampler.pyx
Expand Up @@ -174,6 +174,13 @@ cdef class StateSampler(object):
# pythread doesn't support conditions.
self.sampling_thread.join()

def current_state(self):
"""Returns the current ScopedState.

This operation is not thread safe, and should only be used to check, not to
update information in the current state."""
return self.scoped_states_by_index[self.current_state_index]

def stop_if_still_running(self):
if self.started and not self.finished:
self.stop()
Expand Down
19 changes: 15 additions & 4 deletions sdks/python/apache_beam/runners/worker/statesampler_fake.py
Expand Up @@ -17,14 +17,19 @@

# This module is experimental. No backwards-compatibility guarantees.

from apache_beam.utils.counters import CounterName


class StateSampler(object):

def __init__(self, *args, **kwargs):
pass
self._current_state = _FakeScopedState(self, 'unknown', 'unknown')

def scoped_state(self, step_name, state_name=None, io_target=None):
return _FakeScopedState()
return _FakeScopedState(self, step_name, state_name)

def current_state(self):
return self._current_state

def start(self):
pass
Expand All @@ -41,11 +46,17 @@ def commit_counters(self):

class _FakeScopedState(object):

def __init__(self, sampler, step_name, state_name):
self.name = CounterName(state_name + '-msecs',
step_name=step_name)
self.sampler = sampler

def __enter__(self):
pass
self.old_state = self.sampler.current_state()
self.sampler._current_state = self

def __exit__(self, *unused_args):
pass
self.sampler._current_state = self.old_state

def sampled_seconds(self):
return 0