New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Liberates ReduceFnRunner from WindowingInternals, and lets WindowingInternals do windowed side outputs #1353
Conversation
/** | ||
* Allows accessing the side inputs for a particular main input window. | ||
*/ | ||
public interface SideInputAccess { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Isn't there already SideInputReader
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1 seems to be the same thing.
Reminds me that SideInputReader
really should be in runners-core
but it is still in the SDK due to misuse in CombineFnRunners
and thereabouts.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done. It took some fiddling with WindowingInternals - note also that now SideInputReader is the only user of WindowingInternals.sideInput. I'll be able to move SideInputReader into runners-core after another PR that @peihe is working on.
The changes on the Flink side look good 👍 |
@@ -32,7 +32,7 @@ | |||
import org.apache.beam.sdk.values.TupleTag; | |||
import org.apache.spark.Accumulator; | |||
import org.apache.spark.api.java.function.FlatMapFunction; | |||
|
|||
import org.joda.time.Instant; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
unused.
Occurs in MultiDoFnFunction
as well.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
} | ||
|
||
@Override | ||
public <T> void sideOutputWithTimestamp(TupleTag<T> tupleTag, T t, Instant instant) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If you're opting for enabling side output by default (leaving DoFnFunction
to explicitly override with "Unsupported"), that's fine by me, but I guess DoFnFunction
should override sideOutputWithTimestamp
with an UnsupportedOperationException
as well.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The base class delegates normal output()
and sideOutput()
on the ProcessContext
to the internal abstract methods I introduced - outputWindowedValue
and sideOutputWindowedValue
(I renamed them and made them protected in response to your comments). sideOutputWindowedValue
in DoFnFunction
is overridden to throw an UnsupportedOperationException
.
"sideOutput is an unsupported operation for doFunctions, use a " | ||
+ "MultiDoFunction instead."); | ||
} | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is a complementing comment to say that sideOutputWithTimestamp
should throw UnsupportedOperationException
as well.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hopefully the comment above addresses this too. sideOutputWithTimestamp is defined in the base class and delegates to sideOutputWindowedValue, which here is overridden to throw the exception.
} | ||
|
||
public abstract void output(WindowedValue<OutputT> output); | ||
public abstract <T> void sideOutput(TupleTag<T> tag, WindowedValue<T> output); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
white space between the abstracts ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added.
It seems that neither Spark or Flink runners were actually built by Jenkins as Direct runner failed on "unused imports" in checkstyle. I've added some comments for the changes in the Spark runner, and I've also built the PR branch locally (ignoring checkstyle) and executed the integrations tests for |
@@ -496,4 +501,51 @@ public Timers timers() { | |||
return timers; | |||
} | |||
} | |||
|
|||
private static <W extends BoundedWindow> StateContext<W> stateContextFromComponents( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why move these?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
They are only used by ReduceFnContextFactory (that's why I made them private to this class), and they in any case belong more into runners-core than into SDK - especially now that this method takes SideInputReader, which I would like to move into runners-core.
Thanks all, PTAL! |
LGTM pending Jenkins/Travis. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is quite good. My comments are just slightly more than nits.
|
||
@Override | ||
public <T> T sideInput(PCollectionView<T> view) { | ||
return sideInputReader.get(view, windowFn.getSideInputWindow(mainInputWindow)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We had some discussions, and yes this is right.
SideOutputT output, | ||
Instant timestamp, | ||
Collection<? extends BoundedWindow> windows, | ||
PaneInfo pane) {} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Seems like this (and the above which is duplicates) should both throw. A DoFn
is not permitted to output to a different window. Silently dropping data seems a cruel way to enforce this.
|
||
@Override | ||
public void outputWindowedValue(KV<String, OutputT> output, Instant timestamp, | ||
Collection<? extends BoundedWindow> windows, PaneInfo pane) { | ||
Collection<? extends BoundedWindow> windows, PaneInfo pane) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Don't align. Use google-java-format
.
Instant timestamp, | ||
Collection<? extends BoundedWindow> windows, | ||
PaneInfo pane) { | ||
throw new UnsupportedOperationException(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable error message, please. This might be a checkState
if it is unreachable.
|
||
@Override | ||
public <T> boolean contains(PCollectionView<T> view) { | ||
throw new UnsupportedOperationException(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ditto
|
||
@Override | ||
public boolean isEmpty() { | ||
throw new UnsupportedOperationException(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ditto
|
||
@Override | ||
public <T> boolean contains(PCollectionView<T> view) { | ||
throw new UnsupportedOperationException(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ditto
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you add an informative message to this exception?
|
||
@Override | ||
public boolean isEmpty() { | ||
throw new UnsupportedOperationException(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ditto
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you add an informative message to this exception?
Instant timestamp, | ||
Collection<? extends BoundedWindow> windows, | ||
PaneInfo pane) { | ||
throw new UnsupportedOperationException("Can't output to side outputs from a ReduceFn"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's call it GroupAlsoByWindow
, since actually that is the criteria. If ReduceFn
ever becomes a thing, it might be allowed to side output. In fact, stateful DoFn
is basically ReduceFn
.
Just to direct your attention: the Jenkins failure is checkstyle in various Flink files. It runs before tests, to fail fast. The changes here will almost certainly fail spectacularly if they are incorrect, so I am not trying to replace testing by my manual inspection. |
4db5bb4
to
1325673
Compare
Dang. Looks like I can't just move StateContext.createFromComponents away from the SDK. The Jenkins failure is: (546b2b14867c0a52): Exception: java.lang.NoSuchMethodError: org.apache.beam.sdk.util.state.StateContexts.createFromComponents(Lorg/apache/beam/sdk/options/PipelineOptions;Lorg/apache/beam/sdk/util/WindowingInternals;Lorg/apache/beam/sdk/transforms/windowing/BoundedWindow;)Lorg/apache/beam/sdk/util/state/StateContext; Let's see what I can do about this... |
765cc77
to
e230c9d
Compare
Green, PTAL. |
Makes WindowingInternals.sideInput take the side input window instead of main input window.
It must be temporarily restored for compatibility with current Dataflow worker in order for integration tests to pass.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM with the exception of the missing error messages.
|
||
@Override | ||
public <T> boolean contains(PCollectionView<T> view) { | ||
throw new UnsupportedOperationException(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you add an informative message to this exception?
|
||
@Override | ||
public boolean isEmpty() { | ||
throw new UnsupportedOperationException(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you add an informative message to this exception?
e230c9d
to
c0623c1
Compare
Done. |
R: @kennknowles (for bulk of the code and as committer)
CC: @aljoscha @amitsela (for the minor refactorings in respective runners)