Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[BEAM-2333] Go to proto and back before running a pipeline in Java DirectRunner #3334

Merged
merged 4 commits into from
Jul 25, 2017

Conversation

kennknowles
Copy link
Member

@kennknowles kennknowles commented Jun 8, 2017

Be sure to do all of the following to help us incorporate your contribution
quickly and easily:

  • Make sure the PR title is formatted like:
    [BEAM-<Jira issue #>] Description of pull request
  • Make sure tests pass via mvn clean verify.
  • Replace <Jira issue #> in the title with the actual Jira issue
    number, if there is one.
  • If this contribution is large, please file an Apache
    Individual Contributor License Agreement.

This turn a Pipeline into a protobuf message and then decodes it prior to running it with the Java DirectRunner. Everything is still hardcoded to Java, but this proves that there is adequate information in the protobuf to correctly run a pipeline.

@kennknowles kennknowles force-pushed the Pipeline-rehydrate branch 2 times, most recently from eff523d to 8dd7dfb Compare June 10, 2017 03:30
@kennknowles kennknowles force-pushed the Pipeline-rehydrate branch 3 times, most recently from 113973e to f4b11b3 Compare July 7, 2017 05:13
@kennknowles
Copy link
Member Author

Seems to pass most tests (like all of ParDoTest etc), but mixes up composites/primitives for pipelines in PipelineTest that have composites with no primitives within them (for example returning their input directly or assembling PCollectionTuples, etc)

@coveralls
Copy link

Coverage Status

Coverage increased (+0.008%) to 70.868% when pulling cd648c4 on kennknowles:Pipeline-rehydrate into 440c7d4 on apache:master.

@kennknowles
Copy link
Member Author

The only remaining failure is DisplayData, which is simply a TODO, and then turning the hacks into better designs.

@coveralls
Copy link

Coverage Status

Coverage decreased (-0.1%) to 70.751% when pulling 6cae654 on kennknowles:Pipeline-rehydrate into 0e89df3 on apache:master.

@kennknowles
Copy link
Member Author

R: @tgroh

There are a couple of hacks left in here:

  • Determining if something is a primitive via set difference "outputs minus inputs"
  • Hardcoding the fact that ParDo is the only transform where evaluating it correctly uses additional inputs (specifically, non-additional inputs). I think the right way to do this is probably to just put the main input in the payload and have the evaluator use that.
  • Hitting the DisplayData methods to get them to crash, to not regress validation, while leaving them out of the proto since it isn't on the critical path. Developing the actual translation in parallel.

But it still seems like a decent milestone to perhaps checkpoint where things are with the DirectRunner, depending on how egregious you find said hacks. Suggestions welcome.

@kennknowles kennknowles force-pushed the Pipeline-rehydrate branch 2 times, most recently from bbeba80 to 735673b Compare July 18, 2017 04:39
@kennknowles
Copy link
Member Author

kennknowles commented Jul 18, 2017

@tgroh @jkff I have a hunch that TextIOTest has been succeeding only because the pipeline is mutated before run. This PR first clones before run, causing stable unique name violations in TextIOReadTest.

@kennknowles
Copy link
Member Author

Yup, hacking uniqueness works. Probably we should fix it. Filed https://issues.apache.org/jira/browse/BEAM-2632 as a starter ticket.

@coveralls
Copy link

Coverage Status

Coverage decreased (-0.08%) to 70.53% when pulling 182fdd7 on kennknowles:Pipeline-rehydrate into 7c36318 on apache:master.

@coveralls
Copy link

Coverage Status

Coverage decreased (-0.08%) to 70.53% when pulling 182fdd7 on kennknowles:Pipeline-rehydrate into 7c36318 on apache:master.

@@ -274,7 +317,8 @@ public Node getCurrent() {

private final String fullName;

// Nodes for sub-transforms of a composite transform.
// Nodes for sub-transforms of a composite transform, in shallow topological
// order so visiting in this order yields a recursively topological traversal.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is not strictly required; visit will ensure that the topological order is respected.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will it? That's new. Great! Reverted.

@@ -296,15 +297,18 @@ private static void writeToStreamAndClose(List<String> lines, OutputStream outpu
private void assertReadingCompressedFileMatchesExpected(
File file, CompressionType compressionType, List<String> expected) {

int thisUniquifier = ++uniquifier;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can this be pulled out as an independent change?

RehydratedPTransform.of(
transformSpec.getUrn(), transformSpec.getParameter(), additionalInputs);

// HACK: A primitive is something with outputs that are not in its input
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't know if this is as hacky as this comment claims, as we discussed.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Worth factoring into an isPrimitive method, though

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done. And reversed polarity of if accordingly.


@Override
public void visitPrimitiveTransform(Node node) {
// TODO: Include DisplayData in the proto
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you include a stub implementation that puts in an empty DisplayData (including a DisplayDataTranslation class, so once it's implemented it's already wired in?

An explicit JIRA for "Implement Runner API Display Data" is probably worthwhile as well.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done. Moved that to PTransformTranslation. Filed BEAM-2645.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consider moving the TODO to that location as well.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

.withAllowedLateness(Duration.standardMinutes(3L)));
final WindowingStrategy<?, ?> windowedStrategy = windowed.getWindowingStrategy();
PCollection<KV<String, Long>> keyed = windowed.apply(WithKeys.<String, Long>of("foo"));
PCollection<KV<String, Iterable<Long>>> grouped =
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Worth including a combine and something with side inputs?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this done?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done now. Minor change to the verification. We discussed that the verification could be made both more thorough and less brittle by doing something other than counting, but I'd love to do that in a follow-up...

} catch (IOException exc) {
throw new IllegalArgumentException(
String.format(
"%s received transform that did not contain a Java-serialized %s: %s",
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That specific reason seems like an exception that should be thrown out of the getWindowFn call, as it's returning an object of type WindowFn; This should just report that it couldn't get the WindowFn out of the transform and attach the cause.

* @param inputs the expanded inputs to the transform
* @param outputs the expanded outputs of the transform
*/
private Node(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should finalized be set to true here? all places we use it we then immediately set the bit to true.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No strong opinion. I think it is a bit uniform and readable as-is.

Copy link
Member Author

@kennknowles kennknowles left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm also going to break out some of these sub-pieces for quick-and-easy reviews.


@Override
public void visitPrimitiveTransform(Node node) {
// TODO: Include DisplayData in the proto
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done. Moved that to PTransformTranslation. Filed BEAM-2645.

RehydratedPTransform.of(
transformSpec.getUrn(), transformSpec.getParameter(), additionalInputs);

// HACK: A primitive is something with outputs that are not in its input
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done. And reversed polarity of if accordingly.

@coveralls
Copy link

Coverage Status

Coverage decreased (-0.08%) to 70.497% when pulling 8cca62b on kennknowles:Pipeline-rehydrate into 81c2e90 on apache:master.

@kennknowles
Copy link
Member Author

kennknowles commented Jul 22, 2017

Seem this PR lost a race with some changes to WriteFiles that cause NPEs in the display data because filenamePolicy ends up null, perhaps due to serialization. @jkff @reuvenlax does this sound familiar?

@coveralls
Copy link

Coverage Status

Coverage decreased (-0.09%) to 70.491% when pulling 12a285f on kennknowles:Pipeline-rehydrate into 81c2e90 on apache:master.

@kennknowles
Copy link
Member Author

Turns out they were just broken tests actually passing null for non-nullable values. And we don't aggressively check for null because the modern presumption is that nothing is nullable unless marked. We should probably be more defensive about user input, though.

// Only ParDo really separates main from side inputs
Map<TupleTag<?>, PValue> additionalInputs = Collections.emptyMap();

// TODO: move this ownership into the ParDoTranslator
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This bothers me a bit. I think we've discussed and the obvious plan is to also have ParDoTranslator registered by URN to do the rehydration, rather than one uniform class for RehydratedPTransform.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that's sensible. Doesn't need to happen here, but it's likely worth filing a JIRA to follow-up.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

additionalInputs = PCollectionViews.toAdditionalInputs(views);
}

// TODO: move this ownership into the CombineTranslator
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

See above comment for PAR_DO. This is another case where we want a custom rehydration to produce a RawPTransform that knows what components to register. In this case, the issue is not correct execution, but correct re-dehydration. A bit of a corner case, but should be designed right, and helpful for testing.

@coveralls
Copy link

Coverage Status

Changes Unknown when pulling 5c293f8 on kennknowles:Pipeline-rehydrate into ** on apache:master**.

@coveralls
Copy link

Coverage Status

Changes Unknown when pulling 5c293f8 on kennknowles:Pipeline-rehydrate into ** on apache:master**.


@Override
public void visitPrimitiveTransform(Node node) {
// TODO: Include DisplayData in the proto
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consider moving the TODO to that location as well.

/** Utilities for going to/from DisplayData protos. */
public class DisplayDataTranslation {
public static RunnerApi.DisplayData toProto(DisplayData displayData) {
// TODO https://issues.apache.org/jira/browse/BEAM-2645
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We could consider putting in an "isStubImplementation=true" to signal to anyone looking at the result that it's not empty because it was actually empty.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

// Only ParDo really separates main from side inputs
Map<TupleTag<?>, PValue> additionalInputs = Collections.emptyMap();

// TODO: move this ownership into the ParDoTranslator
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that's sensible. Doesn't need to happen here, but it's likely worth filing a JIRA to follow-up.

.withAllowedLateness(Duration.standardMinutes(3L)));
final WindowingStrategy<?, ?> windowedStrategy = windowed.getWindowingStrategy();
PCollection<KV<String, Long>> keyed = windowed.apply(WithKeys.<String, Long>of("foo"));
PCollection<KV<String, Iterable<Long>>> grouped =
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this done?

@asfgit asfgit merged commit 8ca4591 into apache:master Jul 25, 2017
asfgit pushed a commit that referenced this pull request Jul 25, 2017
…pipeline in Java DirectRunner

  Dehydrate then rehydrate Pipeline before DirectRunner.run()
  Add Pipeline rehydration from proto
  Fix tests that passed invalid input to DynamicDestinations
  Add stub DisplayDataTranslation
@coveralls
Copy link

Coverage Status

Coverage decreased (-0.09%) to 70.503% when pulling 8ca4591 on kennknowles:Pipeline-rehydrate into 0064fb3 on apache:master.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

4 participants