-
Notifications
You must be signed in to change notification settings - Fork 4.2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[BEAM-2898] Support Impulse transforms in Flink batch runner #4783
Conversation
R: @tgroh |
* safely use an unwrapped SerializableCoder because | ||
* {@link SerializableCoder#structuralValue(Serializable)} assumes that coded elements support | ||
* object equality. By default, Coders compare equality by serialized bytes, which we want in | ||
* this case. It is usually safe to depend on coded representation here because we only compare |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
"usually safe..." isn't really necessary.
Probably worth noting that we usually expect Source equals
methods to not be defined.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed.
/** | ||
* Fuses a {@link Pipeline} into a collection of {@link ExecutableStage}s. | ||
* | ||
* <p>This fuser expects each PTransform to have exactly one input. This means that pipelines must |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not true; Flatten
is permitted, as is ParDo
with side inputs.
However, it does expect all transforms with no inputs to not have an associated environment, which is what forces this requirement.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed.
|
||
@Override | ||
public WindowedValue<byte[]> nextRecord(WindowedValue<byte[]> windowedValue) { | ||
hasOutput = false; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This name really confused me for a while. You're using it like "This split/format has output within it, which has not been produced" and I read it as "This split/format has produced output", which, of course, mean the opposite things.
'containsOutput' or 'availableOutput', perhaps?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, I actually struggled to come up with a succinct yet descriptive name. Apparently this failed in the end... ;)
Updated to availableOutput
and added a comment for clarity.
@Test | ||
@Category({ValidatesRunner.class, UsesImpulse.class}) | ||
public void testImpulseRead() { | ||
PCollection<Integer> result = p.apply(JavaReadViaImpulse.bounded(Source.of(1, 2, 3))); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why not just a ParDo
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Will pipeline construction always insert an Impulse in that case? Or do I need to insert one myself? I want to ensure that runners can actually handle Impulse
proper nodes.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What I mean is just insert an Impulse
then a ParDo
after it, instead of wrapping a Source
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
OK, I've added another test with a direct Impulse
-> ParDo
. Let me know if this looks good and I'll delete the source-based test.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This seeks to work fine and tests what we want to test, so I've gone ahead and removed the source-based test.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've added a raw Impulse test and rebased.
@Test | ||
@Category({ValidatesRunner.class, UsesImpulse.class}) | ||
public void testImpulseRead() { | ||
PCollection<Integer> result = p.apply(JavaReadViaImpulse.bounded(Source.of(1, 2, 3))); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
OK, I've added another test with a direct Impulse
-> ParDo
. Let me know if this looks good and I'll delete the source-based test.
You'll need to rebase |
* object equality. By default, Coders compare equality by serialized bytes, which we want in | ||
* this case. It is usually safe to depend on coded representation here because we only compare | ||
* objects on bundle commit, which compares serializations of the same object instance. | ||
* object equality. Sources in general do not support object equality. By default, Coders compare |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Though with #4817 I'm no longer hugely concerned about the coder stuff. I wouldn't revert it, however - certainly, the StructuralValue
is better, even if it's not going to be used anywhere relevant, and the comment seems sufficient for the time being.
* use Impulse/ParDo transformations rather than read nodes. The utilities in | ||
* {@link org.apache.beam.runners.core.construction.JavaReadViaImpulse} can be used to translate | ||
* non-compliant pipelines. | ||
* <p>This fuser expects each PTransform which has no inputs to have an associated environment. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
s/an associated environment/no associated environment
* {@link org.apache.beam.runners.core.construction.JavaReadViaImpulse} can be used to translate | ||
* non-compliant pipelines. | ||
* <p>This fuser expects each PTransform which has no inputs to have an associated environment. | ||
* This means that pipelines must use Impulse/ParDo transformations rather than read nodes. The |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
"This means that pipelines must be rooted at Impulse, or other runner-executed primitive transforms, instead of primitive Read nodes."
@@ -69,6 +69,15 @@ private GreedyPipelineFuser(Pipeline p) { | |||
fusePipeline(groupSiblings(rootConsumers)); | |||
} | |||
|
|||
/** | |||
* Fuses a {@link Pipeline} into a collection of {@link ExecutableStage}s. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
{@link ExecutableStage ExecutableStages}
is our normal style.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
* must be rooted at Impulse, or other runner-executed primitive transforms, instead of primitive | ||
* Read nodes. The utilities in | ||
* {@link org.apache.beam.runners.core.construction.JavaReadViaImpulse} can be used to translate | ||
* non-compliant pipelines. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This does kind of have an associated TODO
for unbounded reads; https://issues.apache.org/jira/browse/BEAM-3859 is the (just-authored) issue to link against.
'can be used to translate non-compliant pipelines -> can be used to convert bounded pipelines using the Read
primitive.'
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
@@ -106,6 +149,13 @@ public void testOutputCoder() { | |||
equalTo(BigEndianIntegerCoder.of())); | |||
} | |||
|
|||
private static void assertNotReadTransform(PTransform<?, ?> transform) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This could be inlined into the anonymous subclass that you'er traversing the pipeline with
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As above, this is already submitted, though I could change it here if you want.
PCollection<Long> input = p.apply(Read.from(source)); | ||
PAssert.that(input).containsInAnyOrder(0L, 1L, 2L, 3L, 4L, 5L, 6L, 7L, 8L, 9L); | ||
p.replaceAll(Collections.singletonList(JavaReadViaImpulse.boundedOverride())); | ||
p.traverseTopologically(new Pipeline.PipelineVisitor() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
extend PipelineVisitor.Defaults, and get rid of enterPipeline
, visitValue
, and leave*
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is actually already in master and was just one of the dependent changes.
|
||
@Override | ||
public WindowedValue<byte[]> nextRecord(WindowedValue<byte[]> windowedValue) { | ||
availableOutput = false; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This could be a checkArgument
before you mutate the state (minor preference, no strong feelings)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
sdks/java/core/build.gradle
Outdated
@@ -66,6 +66,7 @@ dependencies { | |||
shadow library.java.junit | |||
shadow "org.tukaani:xz:1.5" | |||
shadowTest project(":model:fn-execution").sourceSets.test.output | |||
shadowTest project(path: ":runners:core-construction-java", configuration: "shadow") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What introduced this edge?
This also can't be represented in maven, and right now that worries me (though hopefully less soon)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This isn't needed anymore. Thanks for pointing it out.
Review comments addressed. PTAL. |
run java precommit |
1 similar comment
run java precommit |
It looks like the precommits that are actually being run are passing. |
Fusion will be done via
GreedyPipelineFuser
, which does not currently support sourceless Read transforms. Instead, reads must happen viaImpulse
followed by aParDo
.This change adds impulse support to the batch runner and a new
ValidatesRunner
ImpulseTest
.Follow this checklist to help us incorporate your contribution quickly and easily:
[BEAM-XXX] Fixes bug in ApproximateQuantiles
, where you replaceBEAM-XXX
with the appropriate JIRA issue.mvn clean verify
to make sure basic checks pass. A more thorough check will be performed on your pull request automatically.