Skip to content

Comments

[BEAM-646] Get the #apply out of the DirectRunner#2006

Closed
tgroh wants to merge 3 commits intoapache:masterfrom
tgroh:surgery_ish
Closed

[BEAM-646] Get the #apply out of the DirectRunner#2006
tgroh wants to merge 3 commits intoapache:masterfrom
tgroh:surgery_ish

Conversation

@tgroh
Copy link
Member

@tgroh tgroh commented Feb 14, 2017

Be sure to do all of the following to help us incorporate your contribution
quickly and easily:

  • Make sure the PR title is formatted like:
    [BEAM-<Jira issue #>] Description of pull request
  • Make sure tests pass via mvn clean verify. (Even better, enable
    Travis-CI on your fork and ensure the whole test matrix passes).
  • Replace <Jira issue #> in the title with the actual Jira issue
    number, if there is one.
  • If this contribution is large, please file an Apache
    Individual Contributor License Agreement.

Use the Graph Surgery API in the DirectRunner.

The Forwarding View is currently required to ensure that the WriteView transform
is marked as the producer of the View proper. It is then replaced by the original view,
so the Pipeline writes the view to the correct location.

I'm going to revisit the producer-marking code soon, but this is safe to review.

I also missed invariant maintenance within TransformHierarchy within #1998,
so that's added here as well.

@tgroh
Copy link
Member Author

tgroh commented Feb 14, 2017

R: @amitsela
CC/R/etc: @kennknowles

Whaboom!

@coveralls
Copy link

Coverage Status

Changes Unknown when pulling a8bf815 on tgroh:surgery_ish into ** on apache:master**.

@asfbot
Copy link

asfbot commented Feb 14, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/beam_PreCommit_Java_MavenInstall/7416/
--none--

@tgroh
Copy link
Member Author

tgroh commented Feb 15, 2017

Good news! I removed the need for the crazy ForwardingView thingy

@coveralls
Copy link

Coverage Status

Coverage decreased (-0.03%) to 69.678% when pulling 17a03e7 on tgroh:surgery_ish into 0d3389a on apache:master.

@asfbot
Copy link

asfbot commented Feb 15, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/beam_PreCommit_Java_MavenInstall/7426/
--none--

Copy link
Member

@kennknowles kennknowles left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Little nits. Great.

.put(
PTransformMatchers.stateOrTimerParDoSingle(),
new ParDoSingleViaMultiOverrideFactory())
.put(PTransformMatchers.splittableParDoMulti(), new ParDoMultiOverrideFactory())
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When you have a sensitivity to ordering, comment about it it. For example "ParDoSingleViaMulti comes before ParDoMultiOverrideFactory because state and splittable are only set up to work on multi" and then later "ParDoSingleViaMulti comes after ParDoMultiOverrideFactory because the resulting expansions contains yet more ParDo singles"

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done. Most of the other things are independent of this ParDo chain, and/or are commented above.

@Override
public DirectPipelineResult run(Pipeline pipeline) {
for (Map.Entry<PTransformMatcher, PTransformOverrideFactory> override :
defaultTransformOverrides.entrySet()) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Really nice. This is the perfect "finish line" for this API.

new DirectGroupAlsoByWindow<String, WindowedValue<KV<String, Integer>>>(
WindowingStrategy.globalDefault(), WindowingStrategy.globalDefault()))
.apply(
ParDo.of(new ParDoMultiOverrideFactory.ToKeyedWorkItem<String, Integer>())
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a good change. It tests only that the analysis works on the post-surgery graph containing real direct runner primitives.

!isSplittable(getFn()),
"%s does not support Splittable DoFn",
input.getPipeline().getOptions().getRunner().getName());
// SplittableDoFn should be forbidden on the runner-side.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are there JIRAs for the other runners to reject it? Otherwise I think this will just be silently busted.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 isn't SplittableDoFn to be handled by runners via SplittableParDo ?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And reject otherwise.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've explicitly rejected SplittableDoFn in all of the existing runners.

@amitsela
Copy link
Member

Besides joining Kenn's comment on removing assertions in ParDo, looks fine, but unless I'm missing something here it shouldn't be removed, unless this responsibility is passed to runners now, and if so it should still be as part of a separate PR (so still, not here).

Spark runner currently doesn't use apply but I already have a use for Pipeline#replace!

Remove Existing Outputs from Producer Map in replace. This permits
replacements to produce the same PValue as the transform they are
replacing, for example in CreatePCollectionView.

In replace(), add the replacement input to Unexpanded Inputs.
Remove DirectRunner#apply(). This migrates the DirectRunner to work on a
runner-agnostic graph.
@coveralls
Copy link

Coverage Status

Coverage decreased (-0.06%) to 69.218% when pulling f965c3b on tgroh:surgery_ish into 9151676 on apache:master.

@asfbot
Copy link

asfbot commented Feb 17, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/beam_PreCommit_Java_MavenInstall/7508/
--none--

Copy link
Member

@kennknowles kennknowles left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just nits on the error messages. LGTM when you fix them up.


if (signature.processElement().isSplittable()) {
throw new UnsupportedOperationException(
String.format("SparkRunner does not support SplittableDoFn: %s", doFn));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Error message nits:

  • For proper identifiers, interpolate when possible, as in SparkRunner.class.getSimpleName()
  • SplittableDoFn is not a proper identifier. It is just a splittable DoFn.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed.


if (signature.processElement().isSplittable()) {
throw new UnsupportedOperationException(
String.format("ApexRunner does not support Splittable DoFn: %s", doFn));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Error message nits:

  • For proper identifiers, interpolate when possible, as in ApexRunner.class.getSimpleName()
  • Splittable shouldn't be capitalized. It is just a splittable DoFn.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.


if (signature.processElement().isSplittable()) {
throw new UnsupportedOperationException(
String.format("ApexRunner does not support Splittable DoFn: %s", doFn));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Error message nits:

  • For proper identifiers, interpolate when possible, as in ApexRunner.class.getSimpleName()
  • Splittable shouldn't be capitalized. It is just a splittable DoFn.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Dibe,

DoFnSignature signature = DoFnSignatures.getSignature(doFn.getClass());
if (signature.processElement().isSplittable()) {
throw new UnsupportedOperationException(
String.format("FlinkRunner does not currently support Splittable DoFn: %s", doFn));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Error message nits:

  • For proper identifiers, interpolate when possible, as in FlinkRunner.class.getSimpleName()
  • Splittable shouldn't be capitalized. It is just a splittable DoFn.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

DoFnSignature signature = DoFnSignatures.getSignature(doFn.getClass());
if (signature.processElement().isSplittable()) {
throw new UnsupportedOperationException(
String.format("FlinkRunner does not currently support Splittable DoFn: %s", doFn));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Error message nits:

  • For proper identifiers, interpolate when possible, as in FlinkRunner.class.getSimpleName()
  • Splittable shouldn't be capitalized. It is just a splittable DoFn.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

DoFnSignature signature = DoFnSignatures.getSignature(fn.getClass());
if (signature.processElement().isSplittable()) {
throw new UnsupportedOperationException(
String.format("FlinkRunner does not currently support splittable DoFn: %s", fn));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the DataflowRunner. Using identifier interpolation would have caught this :-)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The one time I don't artisinally craft a locally sourced error message I also fail to remember to fix my copypaste.

@coveralls
Copy link

Coverage Status

Coverage decreased (-0.05%) to 69.226% when pulling e6eefea on tgroh:surgery_ish into 9151676 on apache:master.

@asfbot
Copy link

asfbot commented Feb 17, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/beam_PreCommit_Java_MavenInstall/7512/
--none--

@asfgit asfgit closed this in 2ca3bf6 Feb 17, 2017
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants