New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[BEAM-3645] add ParallelBundleManager #8872
[BEAM-3645] add ParallelBundleManager #8872
Conversation
R: @robertwb |
039b305
to
df0ef54
Compare
sdks/python/apache_beam/runners/portability/fn_api_runner_test.py
Outdated
Show resolved
Hide resolved
Run Python PreCommit |
30cc663
to
1b4df97
Compare
Run Portable_Python PreCommit |
self._key_coder = pre_grouped_coder.key_coder() | ||
self._pre_grouped_coder = pre_grouped_coder | ||
self._post_grouped_coder = post_grouped_coder | ||
self._table = collections.defaultdict(list) | ||
self._table_count = 0 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It'd be better to make this a method than a private attribute that must be updated every time the table is mutated.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It is not used and removed at the new commit.
@@ -691,13 +737,14 @@ def input_for(ptransform_id, input_id): | |||
if other_input not in deferred_inputs: | |||
deferred_inputs[other_input] = [] | |||
# TODO(robertwb): merge results | |||
last_result, splits = BundleManager( | |||
last_result, splits = ParallelBundleManager( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just looking at this now, we should probably be creating the bundle_manager just once and then re-using it everywhere. Then we could also decide which kind of bundle manager to create based on whether num_workers > 1.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I changed it to always use ParallelBundleManager
, when num_workers = 1, inputs will not be spitted. Does this sound good?
@@ -139,6 +139,25 @@ def done(self): | |||
self._state = self.DONE_STATE | |||
|
|||
|
|||
class _PartitionBuffer(object): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is there any reason this is a class that has a single method rather than just a function?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This class is removed now.
self._inputs = inputs | ||
|
||
def partition(self, n): | ||
v = list(self._inputs.values())[0] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The check, if any, should be inside the loop and per input. This code (implicitly) assumes exactly one element in inputs--we shouldn't do so without explicitly asserting that. Better would be to handle general dicts.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is removed now.
|
||
def partition(self, n): | ||
v = list(self._inputs.values())[0] | ||
if isinstance(v, list): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I still think it is more natural to use polymorphism rather than if-elif chains for known types. Is there a reason to not do this?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I added _ListBuffer
class to support partitioning for list.
self._table = None | ||
|
||
def __iter__(self): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we actually use raw __iter__
anywhere anymore? If not, remove. If so, perhaps implement as iter(self.partition(1)).
return iter(self._grouped_output) | ||
|
||
def partition(self, n): | ||
self._output_ready() | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: lots of blank lines in this function (and above).
@@ -233,11 +268,13 @@ def encoded_items(self): | |||
|
|||
|
|||
class FnApiRunner(runner.PipelineRunner): | |||
_num_workers = 1 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This should be an instance, not class-level, variable. Class variables are global state.
@@ -284,6 +322,8 @@ def run_pipeline(self, pipeline, options): | |||
pipeline.visit(DataflowRunner.group_by_key_input_visitor()) | |||
self._bundle_repeat = self._bundle_repeat or options.view_as( | |||
pipeline_options.DirectOptions).direct_runner_bundle_repeat | |||
FnApiRunner._num_workers = max(FnApiRunner._num_workers, options.view_as( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Rather than max, it's probably better to have one take precedence. Alternatively, only have one way to set this (e.g. via the option).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Changed to support option only.
# Unique id for the instruction processing this bundle. | ||
BundleManager._uid_counter += 1 | ||
if parallel_uid_counter: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why this change. It seems we don't need to ever pass the uid counter in externally.
65c6b4e
to
23f522a
Compare
@robert, thanks for your comments, they are very helpful and thoughtful. |
3972f49
to
fafd01c
Compare
Run Python PreCommit |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Very nice. Just some suggestions on cleaning things up and I think we're good to go.
for idx, input in enumerate(self): | ||
groups[idx % n].append(input) | ||
|
||
return iter(groups) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just return groups here, not iter(groups).
class _ListBuffer(list): | ||
"""Used to support parititioning of a list.""" | ||
def partition(self, n): | ||
n = min(n, len(self)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It might be cleaner to always return n groups, so the caller doesn't have to deal with less. (The caller can always detect if some groups are empty.)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we still need it, because if we return n groups with some empty groups, we will pass additional {target:[]} to process_bundle
which is not expected. Some tests are hanging without this.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We already need to be able to support empty groups (e.g. initially all timers are empty, when timers are fired all "normal" inputs are empty, some PCollections are actually empty). If there are hanging tests, we should investigate these.
In terms of hanging tests, the key is that the worker waits for a data stream for every InputPortOperator it has in the graph. So there must be a set of elements (even if it is empty) to send to each of these. Maybe this invariant is being broken somewhere.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Here is my investigation.
With n = min(n, len(self)
, we generate following partitioned inputs.
[{'Create/Read/Impulse': [b'\x7f\xdf;dZ\x1c\xac\t\x00\x00\x00\x01\x0f\x00'], 'ParDo(BufferDoFn)_timers_to_read_timer/Read': []}, {}]
With fixed n, we get following partitioned input.
[{'Create/Read/Impulse': [b'\x7f\xdf;dZ\x1c\xac\t\x00\x00\x00\x01\x0f\x00'], 'ParDo(BufferDoFn)_timers_to_read_timer/Read': []}, {'Create/Read/Impulse': []}]
With first case, the second bundle is empty, which can be handled automatically. With second case, second bundle has an input, but without timer, so workers are waiting timers to be sent, which is not possible with second bundle and getting stuck. This happens to all tests with timer.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There were multiple occurrences of n = min(n, len(self)
, did you change all of them? What I'm worried about is (say) one buffer having 2 elements and another 3 elements which we can't handle automatically (as the third dict would not be empty--we'd need to have the lists, even empty lists, for all inputs for all dicts).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I got it. The problem was here.
def partition(self, n):
return [self[k::n] for k in range(n)] if self else [[]]
After I change it to return [] *n empty lists, it all worked. Made same changes to GroupingBuffer.
def partition(self, n):
return [self[k::n] for k in range(n)] if self else [[] for _ in range(n)]
Thanks very much for your critical thinking!
def partition(self, n): | ||
n = min(n, len(self)) | ||
# for empty list, return iter([[]]) | ||
groups = [[] for _ in range(max(n, 1))] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Python slicing could be a useful trick here: https://docs.python.org/2.3/whatsnew/section-slices.html
The notation some_list[start::n] gives every nth element, starting at start. Thus you could just do
def partition(self, n):
[self[k::n] for k in range(n)]
self._grouped_output = [output_stream.get()] | ||
coder_impl.encode_to_stream(wkvs, output_stream_list[idx % n], True) | ||
for output_stream in output_stream_list: | ||
self._grouped_output.append([output_stream.get()]) | ||
self._table = None | ||
return iter(self._grouped_output) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You can drop the iter()
, it was just needed for a result from __iter__
@@ -181,14 +196,32 @@ def __iter__(self): | |||
windowed_key_values = trigger_driver.process_entire_key | |||
coder_impl = self._post_grouped_coder.get_impl() | |||
key_coder_impl = self._key_coder.get_impl() | |||
for encoded_key, windowed_values in self._table.items(): | |||
n = min(n, len(self._table)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As above.
@@ -1343,6 +1381,7 @@ def __init__( | |||
self._controller = controller | |||
self._get_buffer = get_buffer | |||
self._get_input_coder_impl = get_input_coder_impl | |||
self._num_workers = num_workers |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This argument can now be omitted.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is used at L:1540. Default num_workers is got from self._num_workers, and it can be overwritten if num_worker is passed to process_bundle
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see. We should capture this in the constructor of ParallelBundleProcessor (which may call its superclass constructor), not here.
class ParallelBundleManager(BundleManager): | ||
|
||
def _check_inputs_split(self, expected_outputs): | ||
# We skip splitting inputs when timer is set, because operations are not |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
See if the suggestion below fixes this.
if self._check_inputs_split(expected_outputs): | ||
for name, input in inputs.items(): | ||
for part in input.partition(num_workers): | ||
param_list.append(({name : part}, expected_outputs)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
OK, I think this is where the bug above is. Timers are a case where there is more than one set of inputs, but this here adds a new element to param list for every input for every part.
Instead, I think you need something like
partitioned_inputs = [{} for _ in range(num_workers)]
for name, input in inputs.items():
for ix, part in input.partition(num_workers):
partitioned_inputs[ix][name] = part
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This worked fantastically!!
for result, split_result in executor.map(lambda p: BundleManager( | ||
self._controller, self._get_buffer, self._get_input_coder_impl, | ||
self._bundle_descriptor, self._progress_frequency, | ||
self._registered).process_bundle(*p), param_list): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As expected_outputs never changes, you could simplify this by doing
executor.map(lambda part: BundleManager(...).process_bundle(part, expected_outputs), partitioned_inputs)
runner=fn_api_runner.FnApiRunner( | ||
default_environment=beam_runner_api_pb2.Environment( | ||
urn=python_urns.EMBEDDED_PYTHON_GRPC))) | ||
p._options.view_as(DirectOptions).direct_num_workers = 2 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Generally it's preferable to create a pipelines object and pass it to the Pipeline constructor instead of mutating it after the fact.
629b384
to
5ec9d18
Compare
class _ListBuffer(list): | ||
"""Used to support parititioning of a list.""" | ||
def partition(self, n): | ||
n = min(n, len(self)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We already need to be able to support empty groups (e.g. initially all timers are empty, when timers are fired all "normal" inputs are empty, some PCollections are actually empty). If there are hanging tests, we should investigate these.
In terms of hanging tests, the key is that the worker waits for a data stream for every InputPortOperator it has in the graph. So there must be a set of elements (even if it is empty) to send to each of these. Maybe this invariant is being broken somewhere.
@@ -1506,6 +1534,39 @@ def process_bundle(self, inputs, expected_outputs): | |||
return result, split_results | |||
|
|||
|
|||
class ParallelBundleManager(BundleManager): | |||
|
|||
def process_bundle(self, inputs, expected_outputs, num_workers=None): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we need two separate ways of passing num_workers?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I added it in case we want to change num_workers for a certain bundle only, instead of changing at BundleManager level. Do we have a case that process multi bundles parallel with different num_workers? I now changed back to support only self._num_workers, I would like to grab your advice on if we even need to consider such cases.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
While I wouldn't rule out finding a reason to support such a thing later, I would say that it's be cleaner to add such support if/once we need it rather than now. It's generally much easier to read/follow code if parameters come from only one place.
@@ -1343,6 +1381,7 @@ def __init__( | |||
self._controller = controller | |||
self._get_buffer = get_buffer | |||
self._get_input_coder_impl = get_input_coder_impl | |||
self._num_workers = num_workers |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see. We should capture this in the constructor of ParallelBundleProcessor (which may call its superclass constructor), not here.
f9ca8ee
to
4729a45
Compare
Run Python PreCommit |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM, assuming all tests pass. Thanks.
class _ListBuffer(list): | ||
"""Used to support parititioning of a list.""" | ||
def partition(self, n): | ||
return [self[k::n] for k in range(n)] if self else [[] for _ in range(n)] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You can drop the if self else [[] for _ in range(n)]
, the slicing will work fine on an empty list as well.
@@ -1329,7 +1356,7 @@ class BundleManager(object): | |||
|
|||
def __init__( | |||
self, controller, get_buffer, get_input_coder_impl, bundle_descriptor, | |||
progress_frequency=None, skip_registration=False): | |||
num_workers, progress_frequency=None, skip_registration=False): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Delete this argument.
|
||
def __init__( | ||
self, controller, get_buffer, get_input_coder_impl, bundle_descriptor, | ||
num_workers, progress_frequency=None, skip_registration=False): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Generally it'd be preferable to pass num_workers as the last argument, by keyword, so the signatures between super and subclasses agree.
4729a45
to
d8a8423
Compare
Run Python PreCommit - passed. |
Run RAT PreCommit |
Run Python_PVR_Flink PreCommit |
Run Portable_Python PreCommit |
Run Python PreCommit - failed. |
Run Python PreCommit - passed. |
Run Python PreCommit - tests running on dataflow were not able to run because of dataflow issue. No failures with other tests. |
Run Python PreCommit - tests got stuck. |
Run Python PreCommit - no FnAPI related errors, test failed because of env issue on PY37. https://scans.gradle.com/s/7oiypze4pnbr6/console-log?task=:sdks:python:test-suites:tox:py37:testPython37 |
Run Python PreCommit - passed |
Run Python PreCommit - timeout with preCommitPy35. No other failures. |
Run Python PreCommit - failed to build preCommitPy37. No other failures. |
Run Python PreCommit - passed. |
4a8c6ec
to
8766b73
Compare
8766b73
to
f391aa4
Compare
Add ParallelBundleManager class.
This class splits _GroupingBuffer into N pieces to reduce data size and improve processing performance.
Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:
R: @username
).[BEAM-XXX] Fixes bug in ApproximateQuantiles
, where you replaceBEAM-XXX
with the appropriate JIRA issue, if applicable. This will automatically link the pull request to the issue.Post-Commit Tests Status (on master branch)
Pre-Commit Tests Status (on master branch)
See .test-infra/jenkins/README for trigger phrase, status and link of all Jenkins jobs.