Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[BEAM-9639][BEAM-9608] Improvements for FnApiRunner #11270

Merged
merged 4 commits into from Apr 21, 2020

Conversation

pabloem
Copy link
Member

@pabloem pabloem commented Mar 30, 2020

r: @robertwb

These changes are mostly reshuffling of code.

Each commit is a logical unit, so each commit can be reviewed separately. The commit message explains what each commit does:

  • commit[BEAM-9608] BundleManagers use BundleContextManager for configuration - (this commit modifies the BundleManagers to receive only a BundleContextManager with most of their configuration. It also adds a dry_run option for processing bundles without writing to pcoll_buffers)
  • commit[BEAM-9639] Storing side inputs after producer execution, not before consumption - (this commit ensures that side inputs are stored in state right after they are computed - not before they are consumed. this will be useful in streaming so each bundle's inputs are eagerly available)
    • commitEnsuring downstream side inputs are calculated on fully expanded graph - (this commit is more of an accessory to the previous one. It ensures that during graph translations, downstream_side_inputs are annotated after SDFs are expanded)
  • commit[BEAM-9639] Separate Stage and Bundle execution. Improve typing. - (this commit separates the sections of the code for executing a stage such as context creation, committing of side inputs, scheduling of all bundles until no deferred inputs vs the sections of executing a bundle for that stage such as pushing data to worker, collecting bundle deferred inputs)

Notes/todos from #11229

  • It's odd that _run_stage no longer takes as a parameter the stage to run. Perhaps bundle_context_manager (and its class?) should be named stage_context or similar?
  • On this note, perhaps it makes sense to break FnApiRunner into the (mostly stateless) runner that can execute multiple pipelines and an executor (that has methods like run_stage) that might be stateful and is initialized with and tasked with running a single pipeline. Much of what is on context(s) would become state of self of this new object.
  • I was bitten by the fact that it is an error to access the process_bundle_descriptor before _extract_outputs is called. This should be clearly documented (and similarly for the other lazy attributes(s) in this class). Given that that's called external to this class and can't easily be checked, makes me wonder if the boundary of encapsulation needs to be adjusted here.

Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:

  • Choose reviewer(s) and mention them in a comment (R: @username).
  • Format the pull request title like [BEAM-XXX] Fixes bug in ApproximateQuantiles, where you replace BEAM-XXX with the appropriate JIRA issue, if applicable. This will automatically link the pull request to the issue.
  • Update CHANGES.md with noteworthy changes.
  • If this contribution is large, please file an Apache Individual Contributor License Agreement.

See the Contributor Guide for more tips on how to make review process smoother.

Post-Commit Tests Status (on master branch)

Lang SDK Apex Dataflow Flink Gearpump Samza Spark
Go Build Status --- --- Build Status --- --- Build Status
Java Build Status Build Status Build Status
Build Status
Build Status
Build Status
Build Status
Build Status Build Status Build Status
Build Status
Build Status
Python Build Status
Build Status
Build Status
Build Status
--- Build Status
Build Status
Build Status
Build Status
Build Status
--- --- Build Status
XLang --- --- --- Build Status --- --- Build Status

Pre-Commit Tests Status (on master branch)

--- Java Python Go Website
Non-portable Build Status Build Status
Build Status
Build Status Build Status
Portable --- Build Status --- ---

See .test-infra/jenkins/README for trigger phrase, status and link of all Jenkins jobs.

@pabloem pabloem force-pushed the fn-ref-more branch 2 times, most recently from df9cc9b to f445a8d Compare March 31, 2020 22:11
@pabloem
Copy link
Member Author

pabloem commented Mar 31, 2020

Run Portable_Python PreCommit

@pabloem
Copy link
Member Author

pabloem commented Apr 1, 2020

Run Python PreCommit

@pabloem pabloem changed the title [WIP] Improvements for FnApiRunner [BEAM-9639][BEAM-9608] Improvements for FnApiRunner Apr 1, 2020
@pabloem pabloem marked this pull request as ready for review April 1, 2020 15:29
@pabloem pabloem requested a review from robertwb April 1, 2020 18:47
@pabloem
Copy link
Member Author

pabloem commented Apr 1, 2020

this latest commit is purely aesthetic

@pabloem
Copy link
Member Author

pabloem commented Apr 1, 2020

Run Python PreCommit

@pabloem
Copy link
Member Author

pabloem commented Apr 7, 2020

@robertwb ptal

@pabloem
Copy link
Member Author

pabloem commented Apr 10, 2020

attempting to rebase. let's see how far that takes me...

@pabloem pabloem force-pushed the fn-ref-more branch 2 times, most recently from ecf34ae to 82828b3 Compare April 11, 2020 18:04
@pabloem
Copy link
Member Author

pabloem commented Apr 11, 2020

Run Python PreCommit

2 similar comments
@pabloem
Copy link
Member Author

pabloem commented Apr 11, 2020

Run Python PreCommit

@pabloem
Copy link
Member Author

pabloem commented Apr 11, 2020

Run Python PreCommit

Copy link
Contributor

@robertwb robertwb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've read through the entire PR, here's some initial comments.

Thanks for splitting things into logical commits. (FWIW, you can put the descriptions into the commits as well.)

@@ -326,8 +327,8 @@ def commit_side_inputs_to_state(
data_side_input, # type: DataSideInput
):
# type: (...) -> None
for (consuming_transform_id, tag), (buffer_id, func_spec) \
in data_side_input.items():
for (consuming_transform_id, tag), (buffer_id,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if

((consuming_transform_id, tag), (buffer_id, func_spec))

would make both yapf and humans happy.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it did not : ( hehe

translations.create_buffer_id(pcoll), access_pattern)
self.execution_context.commit_side_inputs_to_state(data_side_input)

def extract_bundle_inputs(self):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

extract_bundle_inputs_and_outputs?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

for timer_family_id in payload.timer_family_specs.keys():
expected_timer_output[(transform.unique_name, timer_family_id)] = (
create_buffer_id(timer_family_id, 'timers'))
return data_input, data_output, expected_timer_output
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Update docs to match.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

@@ -367,6 +413,73 @@ def _build_process_bundle_descriptor(self):
state_api_service_descriptor=self.state_api_service_descriptor(),
timer_api_service_descriptor=self.data_api_service_descriptor())

def commit_output_views_to_state(self):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure what "output views" means. Maybe call this commit_side_inputs_to_state as well?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done


def fuse(self, other):
# type: (Stage) -> Stage
return Stage(
"(%s)+(%s)" % (self.name, other.name),
self.transforms + other.transforms,
union(self.downstream_side_inputs, other.downstream_side_inputs),
self._fuse_downstream_side_inputs(other),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: this sounds like it mutates self.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

renamed to _get_fused_downstream_side_inputs thoughts?

# SideInputId is identified by a consumer ParDo + tag.
SideInputId = Tuple[str, str]

DataSideInput = Dict[SideInputId,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What does the value represent?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The value is a tuple with the encoded data. I've moved these to translations.py, and updated the comment

res = dict(self.downstream_side_inputs)
for si, other_si_ids in other.downstream_side_inputs.items():
if si in res:
res[si] = union(res[si], other_si_ids)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So this is actually a dict mapping to sets?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

woah this is a bug.
downstream side input is a dictionary mapping to dictionaries.

Dict[Output Pcollection, Dict[Side input ID, Access pattern]]
Where Side input ID is Tuple[consumer ptransform, input index]. Added appropriate annotations, and fixed the bug.


class Stage(object):
"""A set of Transforms that can be sent to the worker for processing."""
def __init__(self,
name, # type: str
transforms, # type: List[beam_runner_api_pb2.PTransform]
downstream_side_inputs=None, # type: Optional[FrozenSet[str]]
downstream_side_inputs=None, # type: Optional[Dict[str, SideInputId]]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The goal of this (which, yes, should have been better documented) is to quickly be able to prohibit fusion. But the reason we defined our own union was so that memory didn't grow as O(n^2) in the common case because many stages were able to share this set (rather than have their own copy). These changes seem to break that.

Also, could you clarify why this was made into a dict?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hm so this change breaks that, so the memory requirements would be larger. I would think that they would not be too bad, since most graphs don't have many side inputs going many places. What do you think? I'm willing to find a better solution for this, but I wonder if it's worth the extra time.

The reason that this is made into a dict is to contain more information about downstream side inputs. specifically, it contains which transforms will consume the side inputs. this is used to commit the side inputs to state after they are calculated (rather than before they are consumed). This will be necessary for streaming, because side inputs will need to be added to state as they are computed.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Discussed offline, but capturing here for the record. These sets contain the transitive collection of everything downstream of any side-input consuming transform, and as such can be large even if the total number of side inputs is small. (The number of distinct such sets is about the same as the number of side inputs, so we keep the total memory use down by re-using them--to give each transform its own copy would easily be O(n^2).)

Your change of computing the side input mapping after the graph has been fused is good (and arguably better, as you only need the immediate consumers, and don't have to re-compute each time a stage is fused).

@@ -914,14 +898,16 @@ def process_bundle(self,
expected_outputs, # type: DataOutput
fired_timers, # type: Mapping[Tuple[str, str], PartitionableBuffer]
expected_output_timers # type: Dict[str, Dict[str, str]]
dry_run=False
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should be the default, we shouldn't have to pass it.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

@pabloem
Copy link
Member Author

pabloem commented Apr 15, 2020

Run Python PreCommit

1 similar comment
@pabloem
Copy link
Member Author

pabloem commented Apr 15, 2020

Run Python PreCommit

@pabloem
Copy link
Member Author

pabloem commented Apr 15, 2020

failed test is streaming wordcount test

@pabloem
Copy link
Member Author

pabloem commented Apr 15, 2020

Run Python PreCommit

1 similar comment
@pabloem
Copy link
Member Author

pabloem commented Apr 15, 2020

Run Python PreCommit

@pabloem
Copy link
Member Author

pabloem commented Apr 18, 2020

Building the side input index elsewhere. LMK what you think.

@pabloem
Copy link
Member Author

pabloem commented Apr 18, 2020

Run Python PreCommit

1 similar comment
@pabloem
Copy link
Member Author

pabloem commented Apr 18, 2020

Run Python PreCommit

Copy link
Contributor

@robertwb robertwb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, LGTM.

@pabloem
Copy link
Member Author

pabloem commented Apr 21, 2020

Run Python2_PVR_Flink PreCommit

@pabloem pabloem merged commit 1fe543e into apache:master Apr 21, 2020
@pabloem pabloem deleted the fn-ref-more branch April 21, 2020 19:37
@@ -240,6 +240,30 @@ def test_multimap_side_input(self):
lambda k, d: (k, sorted(d[k])), beam.pvalue.AsMultiMap(side)),
equal_to([('a', [1, 3]), ('b', [2])]))

def test_multimap_multiside_input(self):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This test breaks Spark VR test: https://issues.apache.org/jira/browse/BEAM-9862. Please either support the same function for Spark or sickbay it.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for reporting Boyuan, this was a flaw with the Spark runner. Fix: #11644

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

4 participants