Skip to content

[BEAM-2795] Use portable constructs in Flink batch translator#4343

Merged
kennknowles merged 2 commits intoapache:masterfrom
bsidhom:flink-portable-batch
Jan 13, 2018
Merged

[BEAM-2795] Use portable constructs in Flink batch translator#4343
kennknowles merged 2 commits intoapache:masterfrom
bsidhom:flink-portable-batch

Conversation

@bsidhom
Copy link
Contributor

@bsidhom bsidhom commented Jan 4, 2018

This was tested by round-tripping batch pipelines to and from protobuf form. It works with both real Java pipelines and rehydrated pipelines.

References and downcasts to specific transform subclasses are replaced with generic PTransforms. Transform metadata is now accessed through the translation utilities under org.apache.beam.runners.core.construction.

CombineTranslation uses a new side input extractor modeled after ParDoTranslation#getSideInputs.

The RawCombine rehydrated transform exposes side inputs via getAdditionalInputs. Side inputs were not previously exposed as "additional" inputs, so FlinkBatchTranslationContext#getInput could not properly extract the main output collection when side inputs were used.

The ParDo union coder is picky about ordering. It appears that coders must appear at the same indexes as their respective output collection tags. This ordering is now preserved.

Follow this checklist to help us incorporate your contribution quickly and easily:

  • Make sure there is a JIRA issue filed for the change (usually before you start working on it). Trivial changes like typos do not require a JIRA issue. Your pull request should address just this issue, without pulling in other changes.
  • Each commit in the pull request should have a meaningful subject line and body.
  • Format the pull request title like [BEAM-XXX] Fixes bug in ApproximateQuantiles, where you replace BEAM-XXX with the appropriate JIRA issue.
  • Write a pull request description that is detailed enough to understand what the pull request does, how, and why.
  • Run mvn clean verify to make sure basic checks pass. A more thorough check will be performed on your pull request automatically.
  • If this contribution is large, please file an Apache Individual Contributor License Agreement.

@bsidhom
Copy link
Contributor Author

bsidhom commented Jan 4, 2018

I've left a protobuf round-trip in this change to ensure that all tests pass on rehydrated pipelines. I can remove this before submitting.

I've left a few TODOs inline as questions to the reviewer.

Finally, I'm not sure stylistically if it's appropriate to use these try-catch blocks everywhere. How is this normally handled in Beam? Should I bother with better error messages?

Copy link
Member

@kennknowles kennknowles left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems fine to me. I think @aljoscha should take a look.

(PCollection<?>) application.getInputs().get(new TupleTag<>(sideInputTag)),
"no input with tag %s",
sideInputTag);
// TODO: Should ParDoTranslation#viewFromProto live elsewhere?
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yea, seems like it could like in something like PCollectionViewTranslation

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Moved.


@Override
public Map<TupleTag<?>, PValue> getAdditionalInputs() {
// TODO: This was ripped from ParDoTranslation. Is this correct?
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, this looks fine to me. Probably this could also be a helper that is shared between ParDo and Combine, since the canonical definition of how these side inputs should work is based on the composite definition of Combine as a GBK followed by a ParDo.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Where should such a helper live?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Any suggestions? Based on the size of this code, I don't think it's worth refactoring into a brand new class. I'll leave it here unless you have a better suggestion.

LOG.info(node.getTransform().getClass().toString());
throw new UnsupportedOperationException("The transform " + transform
String transformUrn = PTransformTranslation.urnForTransform(transform);
LOG.info(transformUrn);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Logging seems redundant with the exception?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, that was probably my mistake, leaving that log output in.

Map<TupleTag<?>, PValue> outputs = context.getOutputs(transform);

TupleTag<?> mainOutputTag;
try {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Kind of cluttery having this construct everywhere. I wonder if there is a way to lift the exception catching.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is java 8 supported by all the submodules? The only clean way I can think of is to demote the exception to runtime using a lambda wrapper.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've gone the lambda route to address this. It's fairly general and will be needed again in the streaming translator. However, I've left it inside of FlinkBatchTransformTranslators for now because it's unclear to me whether Java 8 is allowed globally.

Please advise on a better location/name.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks like I wrote that too soon. The Maven build appears to be configured to use -source 1.7, at least for this module. I'm not sure what else to do.

@kennknowles
Copy link
Member

run flink validatesrunner

Copy link
Contributor

@aljoscha aljoscha left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I second @kennknowles's comments and made some of my own. Overall I think this looks good but it's currently failing the Jenkins hooks.

sideInputTag);
// TODO: Should ParDoTranslation#viewFromProto live elsewhere?
views.add(
ParDoTranslation.viewFromProto(sideInput, sideInputTag, originalPCollection, combineProto, components));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

checkstyle violation: line's too long

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was wondering why this wasn't caught at compile time. Forgot I had been disabling checkstyle for faster builds. ;)

LOG.info(node.getTransform().getClass().toString());
throw new UnsupportedOperationException("The transform " + transform
String transformUrn = PTransformTranslation.urnForTransform(transform);
LOG.info(transformUrn);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, that was probably my mistake, leaving that log output in.

}
}

// TODO: Why does the UnionCoder order have to match the output map order? Why does this
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The map of tags is created here:

And it's used again here to create individual Flink DataSets for each of the output tag indices:

for (Entry<TupleTag<?>, PValue> output : outputs.entrySet()) {

If the order, i.e. the index, changes in between then the mapping to outputs won't be correct anymore.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That first link is exactly what's happening above. Unfortunately, it's insufficient in getting the union coder to work properly. Leaving that as-is breaks when I do a protobuf round-trip. I have to create a union coder that lists individual coders in the same order they appear in the output map in order to get tests to pass. My question is: why is this only necessary when using a rehydrated pipeline?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it might be because rehydration messes up the order of the individual coders. I also just realised that the code that constructs the outputMap is making sure to put the main coder at index 0 while the code that constructs the lists of union coders doesn't do it. I think this just happened to work because the main-output coder always was at index 0.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To be more specific: I think rehydration changes the order so that the main input is no longer at index 0.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, this is interesting. Is there any requirement that the rest of the outputs match the order of their respective coders?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

UnionCoder requires that the order of the inputs / outputs for the tags to match because the union coder encodes values in a specific order and when reading them in needs to decode them in that same order.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK, thanks for confirming this. I'll leave the fix as-is and remove the comment.

Copy link
Contributor Author

@bsidhom bsidhom left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Responding to comments.

The main thing to note here is that I've added a dependency on Java 8. Our current Flink dependency (1.4.0) doesn't require this, but Flink 1.5.0 will. I think it makes sense to start moving in this direction, but it's not required yet.

If we need to stick with Java 7, then I can bring the try/catch boilerplate back.

@kennknowles
Copy link
Member

We do need to stick with Java 7 until the vote concludes. But this PR could perhaps wait for that result rather than reverting.


@Override
public Map<TupleTag<?>, PValue> getAdditionalInputs() {
// TODO: This was ripped from ParDoTranslation. Is this correct?
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Any suggestions? Based on the size of this code, I don't think it's worth refactoring into a brand new class. I'll leave it here unless you have a better suggestion.

<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<configuration>
<source>1.8</source>
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For some reason, compilation fails on my machine even with Java 8 set for the runners/flink module. I'm reverting it for now and adding a TODO to refactor it once Java 8 is the default.

@bsidhom
Copy link
Contributor Author

bsidhom commented Jan 5, 2018

From what I can tell, Jenkins is failing for some reason unrelated to these changes. Let me know if this is not the case.

@bsidhom
Copy link
Contributor Author

bsidhom commented Jan 5, 2018

Rebasing to see if it helps with the Jenkins issue.

@bsidhom bsidhom force-pushed the flink-portable-batch branch from 8878f94 to a65e98f Compare January 5, 2018 22:59
@bsidhom
Copy link
Contributor Author

bsidhom commented Jan 6, 2018

OK, at least some of the old failures were fixed. The gradle build now fails due what looks like a corrupt dependency file. The maven build fails due to unspecified "dependency problems".

@bsidhom
Copy link
Contributor Author

bsidhom commented Jan 11, 2018

run Flink ValidatesRunner

@bsidhom
Copy link
Contributor Author

bsidhom commented Jan 11, 2018

Friendly ping.

@kennknowles
Copy link
Member

The result of the Flink ValidatesRunner is gone since the last commit was pushed but it was https://builds.apache.org/view/A-D/view/Beam/job/beam_PostCommit_Java_ValidatesRunner_Flink/4665/ so this is as green as it gets for now.

@aljoscha
Copy link
Contributor

What's up with the pre-commit failures? Other than that I think this LGTM!

(Sorry for the tardy responses, I was traveling and I will be on vacation until end of next week.)

@bsidhom
Copy link
Contributor Author

bsidhom commented Jan 12, 2018

Run Flink ValidatesRunner

@kennknowles
Copy link
Member

There's a known issue with Dataflow right now causing the WordCountIT to fail - it is because of the moster classpath containing all of Flink, Spark, Apex, Gearpump, and Dataflow deps in the same Maven profile. Gradle runs the same tests in a better way so we know they pass. And the gradle build is failing only because of a known broken MqttIO test. I will sickbay all of these today; didn't get to them yesterday.

So, yea, this is green as far as I am concerned.

@kennknowles
Copy link
Member

Ben - can you reorganize the commits into a history of meaningful changes, squashing in any that were just incremental changes during development and review.

@bsidhom bsidhom force-pushed the flink-portable-batch branch from a1cd53f to 5d14026 Compare January 12, 2018 19:59
@bsidhom
Copy link
Contributor Author

bsidhom commented Jan 12, 2018

I've rebased and cleaned this up. Should be ready for merging.

@bsidhom bsidhom force-pushed the flink-portable-batch branch from 5d14026 to ee95b95 Compare January 12, 2018 20:30
@kennknowles kennknowles assigned kennknowles and unassigned aljoscha Jan 12, 2018
@kennknowles
Copy link
Member

It does still include a commit and a revert of that commit. Seems silly to add both.

@bsidhom
Copy link
Contributor Author

bsidhom commented Jan 12, 2018

I left that so it's easy enough to cherry pick the change back in when we switch to Java 8. I'll just remove it altogether.

…ation

`CombineTranslation` uses a new side input extractor modeled after
`ParDoTranslation#getSideInputs`.

The `RawCombine` rehydrated transform exposes side inputs via
`getAdditionalInputs`. Side inputs were not previously exposed as
"additional" inputs, so portable translators could not properly extract
the main output collection when side inputs were used.

`ParDoTranslation.viewFromProto` was used all over this package for
general view translations. This method has been moved into a new
`PCollectionViewTranslation` class.
This was tested by round-tripping batch pipelines to and from protobuf
form. It works with both real Java pipelines and rehydrated pipelines.

References and downcasts to specific transform subclasses are replaced
with generic `PTransform`s. Transform metadata is now accessed through
the translation utilities under
`org.apache.beam.runners.core.construction`.

The `ParDo` union coder is picky about ordering. It appears that coders
must appear at the same indexes as their respective output collection
tags. This ordering is now preserved.
@bsidhom bsidhom force-pushed the flink-portable-batch branch from ee95b95 to 5ddf50b Compare January 12, 2018 22:25
@bsidhom
Copy link
Contributor Author

bsidhom commented Jan 12, 2018

I've removed the commit/revert pair.

@kennknowles kennknowles merged commit b88d150 into apache:master Jan 13, 2018
@lukecwik
Copy link
Member

This broke the checkstyle in Flink, filed BEAM-3478 and cut PR/4410

@bsidhom bsidhom deleted the flink-portable-batch branch January 18, 2018 22:42
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants