Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[BEAM-3042] Add tracking of bytes read / time spent when reading side inputs #3943

Closed
wants to merge 2 commits into from

Conversation

pabloem
Copy link
Member

@pabloem pabloem commented Oct 4, 2017

Testing changes to track msecs and bytes spent while reading from side inputs.

@coveralls
Copy link

Coverage Status

Changes Unknown when pulling 6e2c396 on pabloem:sicounters into ** on apache:master**.

@pabloem
Copy link
Member Author

pabloem commented Oct 6, 2017

@chamikaramj
Hi Cham. I'm prototyping this change to go in fairly soon. I want to track bytes read from side inputs and msecs blocked waiting for side inputs. Could you take a look?

@chamikaramj
Copy link
Contributor

cc: @charlesccychen

@pabloem
Copy link
Member Author

pabloem commented Oct 10, 2017

Cham has advised me to find a reviewer that is more familiar with side inputs.
r: @charlesccychen could you take a look?

A quick summary of the change:
In the Insights team we're working on a project to keep track of time spent / bytes read from side inputs, so that users may be better informed of bottlenecks in their pipelines.
For this, we track msecs spent blocked waiting for side inputs (so, only when the Queue is empty ahead of time).

An interesting, and perhaps counter-intuitive scenario that we're planning for is the following dummy example:

si_iterable = AsIter(p | Create(long_list))

def emit_side_input(e, si_iter):
  yield si_iter

pcoll = p 
  | Create([0]) 
  | 'step1' >> beam.Map(emit_side_input, si_iter=si_iterable) 
  | 'step2' >> beam.FlatMap(lambda x: list(x))

In this case, step1 emits the iterable, and step2 is the one that actually may be blocked waiting for side inputs. That's why we need to check the current step when the thread blocks.

I'd like a review of the approach, and I'll come back with benchmark results soon. Hope it's not much trouble, Charles. Thanks.
Best
-P.

Copy link
Contributor

@chamikaramj chamikaramj left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Some comments.

Could you create a JIRA with appropriate context ?

@@ -78,29 +82,51 @@ def _start_reader_threads(self):
t.start()
self.reader_threads.append(t)

def _get_source_position(self, range_tracker=None, reader=None):
if reader:
return reader.get_progress().position.byte_offset
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What about side input sources that do not have byte offsets ?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I need Charles or Robert to input here. In my understanding, side inputs always use Avro files - so they are always byte-offset-based sources. Would this be correct? @charlesccychen

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think they might now, but we should verify in the case one uses the result of a read (e.g. Create or ReadTextIO) directly as a side input.

Even if it is the case, best to assert this assumption explicitly somewhere.

self.sources = sources
self.num_reader_threads = min(max_reader_threads, len(self.sources))
self.read_counter = read_counter or opcounters.TransformIoCounter()
# self.read_counter = opcounters.TransformIoCounter()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is this commented out ?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is to allow to test a no-op counter vs the implementation. It will be removed before merging.

@@ -42,6 +43,58 @@ def value(self):
return self._value


class TransformIoCounter(object):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you add documentation ? Also is this a user-facing interface ? If so this should be discussed more broadly.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This won't be a user-facing interface. It's meant to be used by the IO infrastructure classes. I'll add documentation in a bit.

return reader.get_progress().position.byte_offset
else:
return range_tracker.position_at_fraction(
range_tracker.fraction_consumed()) if range_tracker else 0
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it valid to return

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry forgot to complete this.

Should be: Is it valid to return 0 here should that case be an error ?

@@ -78,29 +82,51 @@ def _start_reader_threads(self):
t.start()
self.reader_threads.append(t)

def _get_source_position(self, range_tracker=None, reader=None):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

when does this a get range_tracker vs reader ? Please clarify with a comment.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1. Is there a way to avoid this either/or altogether?

# Inputs are 1-indexed, so we add 1 to i in the side input id
counters.side_input_id(self.operation_name, i+1))
iterator_fn = sideinputs.get_iterator_fn_for_sources(
sources, read_counter=si_counter)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we want to use the same counter for all the sources here ?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. A single side input may have different sources, but we want to track bytes/msecs for the side input, not per-source.

@chamikaramj
Copy link
Contributor

R: @robertwb or @charlesccychen

@pabloem pabloem changed the title Adding tracking for bytes and msecs spent while reading from side inputs [BEAM-3042] Add tracking of bytes read / time spent when reading side inputs Oct 10, 2017
Copy link
Contributor

@robertwb robertwb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Some initial comments.

@@ -78,29 +82,51 @@ def _start_reader_threads(self):
t.start()
self.reader_threads.append(t)

def _get_source_position(self, range_tracker=None, reader=None):
if reader:
return reader.get_progress().position.byte_offset
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think they might now, but we should verify in the case one uses the result of a read (e.g. Create or ReadTextIO) directly as a side input.

Even if it is the case, best to assert this assumption explicitly somewhere.

@@ -78,29 +82,51 @@ def _start_reader_threads(self):
t.start()
self.reader_threads.append(t)

def _get_source_position(self, range_tracker=None, reader=None):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1. Is there a way to avoid this either/or altogether?

self._state_sampler = state_sampler
self._bytes_read_cache = 0
self.io_target = io_target
self.check_step()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What does this do? Documentation on the parent class would be helpful even if it's not user-facing. The implementation below doesn't look like it's checking stuff.

def __exit__(self, unused_exc_type, unused_exc_value, unused_traceback):
self.exit()

def enter(self):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are these needed in addition to enter and exit?

@@ -128,7 +154,14 @@ def __iter__(self):
num_readers_finished = 0
try:
while True:
element = self.element_queue.get()
if self.element_queue.empty():
# The queue is empty. We check the current state.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not following the relationship here.

returns_windowed_values = reader.returns_windowed_values
for value in reader:
if self.has_errored:
# If any reader has errored, just return.
# If any reader has errored, just return.`
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Extra backtick.

if self.has_errored:
# If any reader has errored, just return.
return

current_position = self._get_source_position(range_tracker=rt)
consumed_bytes = current_position - initial_position
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we just assume position is in bytes, and can be subtracted?

@pabloem pabloem closed this Dec 11, 2017
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

4 participants