New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[NEMO-216,251,259] Support side inputs and windowing #159
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@johnyangk Thanks for the work! I've done the first review and left some comments. I'm wondering why we should emit WatemarkWithIndex
in source because the index in WatermarkWithIndex
is the inter-task edge index. Also, I've left some comments about SideInputTransform
and index within that class.
} | ||
|
||
@Override | ||
public <T> boolean contains(final PCollectionView<T> view) { | ||
public <T> boolean contains(PCollectionView<T> view) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
final
|
||
/** | ||
* A sideinput reader that reads/writes side input values to context. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please add comments.
BroadcastVariableSideInputReader(final Transform.Context context, | ||
final Collection<PCollectionView<?>> sideInputs) { | ||
this.context = context; | ||
public MaterializedViewReader(final List<PCollectionView<?>> sideInputs) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why List? not Collection
?
return sideInputs.contains(view); | ||
} | ||
|
||
@Override | ||
public boolean isEmpty() { | ||
return sideInputs.isEmpty(); | ||
} | ||
|
||
public <T> void addView(final PCollectionView<T> view, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
addMaterializedViewData
final PCollectionView view = entry.getValue(); | ||
|
||
final IRVertex srcVertex = pValueToProducerVertex.get(view); | ||
final IRVertex sideInputTransformVertex = new OperatorVertex(new SideInputTransform(index)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why don't you use the view
, instead of index
? I think using view
is more explicit, instead of the index.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
SideInputElement(Int, Object) is cheaper to encode than SideInputElement(PCollectionView, Object)
(The Flink runner also uses Int rather than PCollectionView)
@@ -205,6 +212,11 @@ public TimerInternals timerInternals() { | |||
inputCoder, | |||
outputCoders, | |||
windowingStrategy); | |||
|
|||
pushBackRunner = SimplePushbackSideInputDoFnRunner.<InterT, OutputT>create( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It would be good to use this runner only if the DoFn
handles side inputs. If the side input is empty, using doFnRunner would be good.
@@ -112,20 +113,12 @@ public void onWatermark(final Watermark inputWatermark) { | |||
@Override | |||
public void close() { | |||
onWatermark(new Watermark(BoundedWindow.TIMESTAMP_MAX_VALUE.getMillis())); | |||
|
|||
if (!isEmitted) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How to handle this situation if it is removed?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is NEMO-291 handled in this PR?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think so (PerPercentile test passes)
It will be handled more explicitly in https://github.com/apache/incubator-nemo/pull/159/files#diff-30a095310e20d5a90938099d206b1757R32
* Side input transform implementation. | ||
* @param <T> input/output type. | ||
*/ | ||
public final class SideInputTransform<T> implements Transform<WindowedValue<T>, SideInputElement> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this class can be removed if we use view
instead of index
. We can emit the materialized data in CreateViewTransform
with view
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You're right. However, I'm concerned about the cost of encoding a PCollectionView
per sideinput (which can be many with small windows).
A SimplePCollectionView objcet has 6 fields which appear to be expensive to encode.
https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PCollectionViews.java#L339
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@johnyangk I agree with your concern but I think the cost is negligible and not expensive because creating view is not frequent operation. This is an optimization for special case not common case (main input), so I prefer to use PCollectionView
and keep the code more explicitly and concisely.
@@ -33,6 +33,7 @@ | |||
* Constructor. | |||
*/ | |||
public DedicatedKeyPerElementPartitioner() { | |||
// TODO should not be 0 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
?
@@ -104,7 +107,8 @@ private boolean isWatermarkTriggerTime() { | |||
private Object retrieveElement() throws NoSuchElementException, IOException { | |||
// Emit watermark | |||
if (!bounded && isWatermarkTriggerTime()) { | |||
return new Watermark(readable.readWatermark()); | |||
// index=0 as there is only 1 input stream | |||
return new WatermarkWithIndex(new Watermark(readable.readWatermark()), 0); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why do we emit this here? The index is outgoing edge index between inter task
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'll take a look at this.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @taegeonum. I've addressed your comments.
I'm working on fixing some ITCase failures now. I'll also run the TPC-H benchmark to check performance and share the results in this thread.
final PCollectionView view = entry.getValue(); | ||
|
||
final IRVertex srcVertex = pValueToProducerVertex.get(view); | ||
final IRVertex sideInputTransformVertex = new OperatorVertex(new SideInputTransform(index)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
SideInputElement(Int, Object) is cheaper to encode than SideInputElement(PCollectionView, Object)
(The Flink runner also uses Int rather than PCollectionView)
* TODO: users should not use this. | ||
*/ | ||
public final class SideInputElement<T> { | ||
private final int viewIndex; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
SideInputElement(Int, Object) is cheaper to encode than SideInputElement(PCollectionView, Object)
(The Flink runner also uses Int rather than PCollectionView)
private static final Logger LOG = LoggerFactory.getLogger(AbstractDoFnTransform.class.getName()); | ||
|
||
private final TupleTag<OutputT> mainOutputTag; | ||
private final List<TupleTag<?>> additionalOutputTags; | ||
private final Collection<PCollectionView<?>> sideInputs; | ||
private final Map<Integer, PCollectionView<?>> sideInputs; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can avoid encoding PCollectionViews.
@@ -112,20 +113,12 @@ public void onWatermark(final Watermark inputWatermark) { | |||
@Override | |||
public void close() { | |||
onWatermark(new Watermark(BoundedWindow.TIMESTAMP_MAX_VALUE.getMillis())); | |||
|
|||
if (!isEmitted) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
* Side input transform implementation. | ||
* @param <T> input/output type. | ||
*/ | ||
public final class SideInputTransform<T> implements Transform<WindowedValue<T>, SideInputElement> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You're right. However, I'm concerned about the cost of encoding a PCollectionView
per sideinput (which can be many with small windows).
A SimplePCollectionView objcet has 6 fields which appear to be expensive to encode.
https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PCollectionViews.java#L339
I ran the TPC-H benchmark with this PR and the there was no meaningful performance degradation. (probably thanks to branch prediction in the DoFnTransform) |
@taegeonum I've fixed the tests and made the following changes. Please take a look. 😄
|
@taegeonum (I think) I've also fixed the empty side input issue. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@johnyangk Thanks! I've left some comments. Could you please take a look at them?
|
||
/** | ||
* Accumulates and provides side inputs in memory. | ||
* TODO #290: Handle OOMs in InMemorySideInputReader |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Support the removal of side input data
return sideInputsToRead.isEmpty(); | ||
} | ||
|
||
public <T> void addSideInputValue(final PCollectionView<T> view, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please add comments
} | ||
} | ||
|
||
public void trackCurWatermark(final long newWatermark) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please add comments
} | ||
} | ||
|
||
public void trackCurWatermark(final long newWatermark) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
trackCurWatermark
-> setCurrentWatermark
final Coder elementCoder, | ||
final Coder windowCoder) { | ||
void addEdge(final IREdge edge, final Coder elementCoder, final Coder windowCoder) { | ||
// TODO key extractor only when many to many |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
please add jira issue number
@Override | ||
public void onData(final Object data) { | ||
// Need to distinguish side/main inputs and push-back main inputs. | ||
if (data instanceof SideInputElement) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if (windowedvalue.getValue() instanceof ...)
@Override | ||
public void onWatermark(final Watermark watermark) { | ||
curInputWatermark = watermark.getTimestamp(); | ||
getSideInputHandler().trackCurWatermark(curInputWatermark); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How to guarantee this watermark is created from SideInputTransform?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Changed the method name to setCurrentWatermarkOfAllMainAndSideInputs
curInputWatermark = watermark.getTimestamp(); | ||
getSideInputHandler().trackCurWatermark(curInputWatermark); | ||
|
||
final long minOfInputAndPushback = Math.min(curInputWatermark, curPushedBackWatermark); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
outputWatermarkCandidate
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We also should process pushBackElements whenever receiving watermarks from side input
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
} | ||
|
||
if (watermark.getTimestamp() >= BoundedWindow.TIMESTAMP_MAX_VALUE.getMillis()) { | ||
hardFlushAllPushedbacks(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No need to do it if we handle pushBackElements when receiving watermarks
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Changed to watermark + handlepushback (as per the discussion)
@@ -99,35 +96,14 @@ Object fetchDataElement() throws IOException, NoSuchElementException { | |||
private void fetchDataLazily() { | |||
final List<CompletableFuture<DataUtil.IteratorWithNumBytes>> futures = readersForParentTask.read(); | |||
numOfIterators = futures.size(); | |||
|
|||
if (numOfIterators > 1) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why is it removed? is it related to this PR? or is it just refactoring?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've reverted the code 😄
Considering that it makes the InputWatermarkManager come "in front" of the data fetcher, and hides indices from TaskExecutor.
Although there are disadvantages of a bit more complex code, and having to handle indices in each DataFetcher.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @taegeonum. I've addressed your comments.
@Override | ||
public void onWatermark(final Watermark watermark) { | ||
curInputWatermark = watermark.getTimestamp(); | ||
getSideInputHandler().trackCurWatermark(curInputWatermark); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Changed the method name to setCurrentWatermarkOfAllMainAndSideInputs
curInputWatermark = watermark.getTimestamp(); | ||
getSideInputHandler().trackCurWatermark(curInputWatermark); | ||
|
||
final long minOfInputAndPushback = Math.min(curInputWatermark, curPushedBackWatermark); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
} | ||
|
||
if (watermark.getTimestamp() >= BoundedWindow.TIMESTAMP_MAX_VALUE.getMillis()) { | ||
hardFlushAllPushedbacks(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Changed to watermark + handlepushback (as per the discussion)
|
||
@Override | ||
public void onData(final WindowedValue<T> element) { | ||
outputCollector.emit(new SideInputElement(index, element)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks! I was looking for something like this....
// TODO #287: Consider Explicit Multi-Input IR Transform | ||
|
||
// Flush out any current bundle-related states in the DoFn, | ||
// as this sideinput may trigger the processing of pushed-back data. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've added the following comment.
// Force-flush, as this sideinput may trigger the processing of pushed-back data.
//
// Main reason:
// {@link org.apache.beam.runners.core.SimplePushbackSideInputDoFnRunner}
// caches for each bundle the side inputs that are not ready.
// We need to re-start the bundle to advertise the newly available side input.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For the remaining comments. 😄
if (bundleFinished) { | ||
bundleFinished = false; | ||
doFnRunner.startBundle(); | ||
if (pushBackRunner == null) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it's better to maintain a single method for doFnRunner/pushBackRunner, as bundle-related methods are called from multiple different places.
@@ -112,20 +113,12 @@ public void onWatermark(final Watermark inputWatermark) { | |||
@Override | |||
public void close() { | |||
onWatermark(new Watermark(BoundedWindow.TIMESTAMP_MAX_VALUE.getMillis())); | |||
|
|||
if (!isEmitted) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think so (PerPercentile test passes)
It will be handled more explicitly in https://github.com/apache/incubator-nemo/pull/159/files#diff-30a095310e20d5a90938099d206b1757R32
@@ -99,35 +96,14 @@ Object fetchDataElement() throws IOException, NoSuchElementException { | |||
private void fetchDataLazily() { | |||
final List<CompletableFuture<DataUtil.IteratorWithNumBytes>> futures = readersForParentTask.read(); | |||
numOfIterators = futures.size(); | |||
|
|||
if (numOfIterators > 1) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've reverted the code 😄
Considering that it makes the InputWatermarkManager come "in front" of the data fetcher, and hides indices from TaskExecutor.
Although there are disadvantages of a bit more complex code, and having to handle indices in each DataFetcher.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@johnyangk I've left some comments
if (bundleFinished) { | ||
bundleFinished = false; | ||
doFnRunner.startBundle(); | ||
if (pushBackRunner == null) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It looks like we use different start/finish bundle strategy for simple DoFnRunner and pushBackRunner. Separating them into different methods would be good.
/** | ||
* Finish bundle without checking for conditions. | ||
*/ | ||
final void forceFinishBundle() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is only used for PushBackRunner.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is also used in AbstractDoFnTransform#close
final List<WindowedValue<InputT>> pushedBackAgain = new ArrayList<>(); | ||
long pushedBackAgainWatermark = Long.MAX_VALUE; | ||
for (WindowedValue<InputT> curPushedBack : curPushedBacks) { | ||
final Iterable<WindowedValue<InputT>> pushedBack = |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
checkAndInvokeBundle
here?
for (WindowedValue<InputT> curPushedBack : curPushedBacks) { | ||
final Iterable<WindowedValue<InputT>> pushedBack = | ||
getPushBackRunner().processElementInReadyWindows(curPushedBack); | ||
for (final WindowedValue<InputT> wv : pushedBack) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
checkAndFinishBundle
here?
// {@link org.apache.beam.runners.core.SimplePushbackSideInputDoFnRunner} | ||
// caches for each bundle the side inputs that are not ready. | ||
// We need to re-start the bundle to advertise the (possibly) newly available side input. | ||
forceFinishBundle(); // forced |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why don't you just call pushBackRunner.startBundle
before handlePushBacks and pushBackRunner.finishBundle
after handlePushBacks?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We need to force-finish the previous bundle as described in the comment.
Otherwise, DoFnTransformTest#testSideInputs fails.
checkAndInvokeBundle(); | ||
// With the new side input added, we may be able to process some pushed-back elements. | ||
final List<WindowedValue<InputT>> pushedBackAgain = new ArrayList<>(); | ||
long pushedBackAgainWatermark = Long.MAX_VALUE; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
curPushBackWatermark = Long.MAX_VALUE
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
curPushedBackWatermark is a global variable.
We reset the 'cur's with 'pushedBack's.
curPushedBacks = pushedBackAgain;
curPushedBackWatermark = pushedBackAgainWatermark;
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, but I think we don't need the pushedBackAgainWatermark
variable if we just set curPushBackWatermark = Long.MAX_VALUE
here?
// With the new side input added, we may be able to process some pushed-back elements. | ||
final List<WindowedValue<InputT>> pushedBackAgain = new ArrayList<>(); | ||
long pushedBackAgainWatermark = Long.MAX_VALUE; | ||
for (WindowedValue<InputT> curPushedBack : curPushedBacks) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
final
@@ -104,6 +104,7 @@ private boolean isWatermarkTriggerTime() { | |||
private Object retrieveElement() throws NoSuchElementException, IOException { | |||
// Emit watermark | |||
if (!bounded && isWatermarkTriggerTime()) { | |||
// index=0 as there is only 1 input stream |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
please remove this comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @taegeonum. I've updated the code.
/** | ||
* Finish bundle without checking for conditions. | ||
*/ | ||
final void forceFinishBundle() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is also used in AbstractDoFnTransform#close
// {@link org.apache.beam.runners.core.SimplePushbackSideInputDoFnRunner} | ||
// caches for each bundle the side inputs that are not ready. | ||
// We need to re-start the bundle to advertise the (possibly) newly available side input. | ||
forceFinishBundle(); // forced |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We need to force-finish the previous bundle as described in the comment.
Otherwise, DoFnTransformTest#testSideInputs fails.
if (bundleFinished) { | ||
bundleFinished = false; | ||
doFnRunner.startBundle(); | ||
if (pushBackRunner == null) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That will give us 3 duplicate methods.
I left a comment on refactoring. Maybe we can come back to this when implementing streaming partial combiner (i.e., combining ReduceFn) which will probably also use pushBackRunner?
JIRA: NEMO-216: Support side inputs and windowing
(+ NEMO-251, NEMO-259)
Major changes:
Minor changes to note:
Tests for the changes:
Other comments: