New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[FLINK-12490][network] Introduce Input and NetworkInput interfaces #8476
Conversation
Thanks a lot for your contribution to the Apache Flink project. I'm the @flinkbot. I help the community Review Progress
Please see the Pull Request Review Guide for a full explanation of the review process. The Bot is tracking the review progress through labels. Labels are applied according to the order of the review items. For consensus, approval by a Flink committer of PMC member is required Bot commandsThe @flinkbot bot supports the following commands:
|
* Basic interface for inputs of stream operators. | ||
*/ | ||
@Internal | ||
public interface Input extends AsyncDataInput<StreamElement>, Closeable { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we change it to implement AutoCloseable
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Generally speaking Closeable
is better and preferred - it extends Autocloseable
and provide stronger/easier to use contract of close()
being idempotent.
Is there a reason why Input#close()
can not be idempotent?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Generally speaking Closeable is better and preferred - it extends Autocloseable and provide stronger/easier to use contract of close() being idempotent.
You are right.
Is there a reason why Input#close() can not be idempotent?
No.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
About currentChannel
in StreamInputProcessor
, I left some comments. @pnowojski
private void processElement(StreamElement recordOrMark) throws Exception { | ||
if (recordOrMark.isWatermark()) { | ||
// handle watermark | ||
statusWatermarkValve.inputWatermark(recordOrMark.asWatermark(), currentChannel); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
- It seems missing some logic about assigning value to
currentChannel
, and currently it is always - 1. - I think that the input channel index should be hidden in
NetworkInput
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
True, there was a bug. However I'm not sure if we should squeeze more functionality to the NetworkInput
. As it is now, it's a pretty small and nice class, that has a single responsibility - deserializing records, it would be nice to keep it as that.
I was actually thinking about maybe latter adding another wrapper class, that would wrap Input
and encapsulate watermark handling?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In fact, I care about that the input channel index is exposed to input processors. As long as it's not exposed to the input processor (for example, encapsulating a class as you said), it's fine to me.
statusWatermarkValve.inputWatermark(recordOrMark.asWatermark(), currentChannel); | ||
} else if (recordOrMark.isStreamStatus()) { | ||
// handle stream status | ||
statusWatermarkValve.inputStreamStatus(recordOrMark.asStreamStatus(), currentChannel); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same as the previous comment.
} | ||
} | ||
|
||
Optional<BufferOrEvent> bufferOrEvent = barrierHandler.pollNext(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In StreamTwoInputSeletableProcessor
, input1
and input2
each correspond to a NetworkInput
, and each NetworkInput
should contain its own InputGate
(inputGate1
or inputGate2
). But I guess your BarrierHandler
should contain one UnionInputGate
unioned inputGate1
and inputGate2
in case of two inputs (because BarrierHandler
must see all input channels), so I think the current implementation of BarrierHandler
may not meet the requirements of StreamTwoInputSeletableProcessor
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
True, that's the next thing that I want to address. In this commit I just want to make sure that the basic abstraction of NetworkInput
is not introducing performance regressions, which was one of our concern in the offline discussions.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Got it.
*/ | ||
@Internal | ||
public interface Input extends AsyncDataInput<StreamElement>, Closeable { | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Similar to InputChannel#getChannelIndex()
, adding int getInputIndex()
to Input
can simplify some processing in StreamTwoInputSelectableProcessor
. So I suggest to add it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree, but I didn't want to add a dead code now - we can extend this interface in a follow up commit.
StreamElement recordOrMark = input.pollNext(); | ||
if (recordOrMark == null) { | ||
input.isAvailable().get(); | ||
return input.isFinished(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should here bereturn !input.isFinished()
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
😳
This might explain deadlocks that were found by our tests :)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Had some more comments on top of what @sunhaibotb already suggested. From the desciption, I also assume that the two input cases is not included in the PR on purpose, right?
* Basic interface for inputs of stream operators. | ||
*/ | ||
@Internal | ||
public interface Input extends AsyncDataInput<StreamElement>, Closeable { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If I look at what interfaces are extended, the name of this interface looks a bit strange/over simplified from that perspective. So does the comment btw.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've also noticed that (I was struggling with finding a good name for AsyncDataInput
), but what would you suggest?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe something along the line of StreamTask[Async][Data]Input
. This should make role of this interface a lot clearer than the super generic name that it has now.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
StreamTaskInput
is not that bad. And the concrete class StreamTaskNetworkInput
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍 That sounds likw two names that create a mental model in my head about what this does.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@sunhaibotb, are you fine with renaming those renames? (and fyi, since renaming will affect a wip StreamSelectableTwoInputStreamProcessor
PR.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@pnowojski @sunhaibotb ping so that we can address this if we aggree upon my suggestion.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@sunhaibotb, are you fine with renaming those renames? (and fyi, since renaming will affect a wip
StreamSelectableTwoInputStreamProcessor
PR.
It's fine to me. @pnowojski @StefanRRichter
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've renamed the classes.
@@ -76,4 +62,6 @@ | |||
* @return The duration in nanoseconds | |||
*/ | |||
long getAlignmentDurationNanos(); | |||
|
|||
int getNumberOfInputChannels(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Doc
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
getNumberOfInputChannels // @return number of input channels
:(
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
getAlignmentDurationNanos // @return The duration in nanoseconds
? But it is really a nit.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes I know and I'm not a big fan of it as well :)
(maybe you didn't notice, but I've already added the comment you requested yesterday.)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Upsi, no I first replied to your anwers and now checked out the updated branch :D
* The channel from which a buffer came, tracked so that we can appropriately map | ||
* the watermarks and watermark statuses to the correct channel index of the correct valve. | ||
*/ | ||
private int currentChannel = -1; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit/Personal taste: I would do all field initialization in constructors only.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also would use a static final int UNSPECIFIED = -1;
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This field could currenly also just be a local variable.
import java.util.concurrent.CompletableFuture; | ||
|
||
/** | ||
* Implementation of taking {@link InputGate} as {@link Input}. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I know in some sense that is true, but it actually takes a CheckpointBarrierHandler
.
@Internal | ||
public final class NetworkInput implements Input { | ||
|
||
private final int numberOfInputChannels; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
At least currently, this field has no real use.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the reviews @sunhaibotb and @StefanRRichter. Sorry for pushing the PR with such obvious bugs (travis was not passing because of them).
There was also one more bug that we have missed (found by another dead locks on travis) and it's about already merged InputGate#isAvailable()
changes. I have fixed it in this PR in a new commit:
[hotfix][network] Fix InputGate#isAvailable not completed while InputGate#isFinished is true
Please check it out.
@@ -76,4 +62,6 @@ | |||
* @return The duration in nanoseconds | |||
*/ | |||
long getAlignmentDurationNanos(); | |||
|
|||
int getNumberOfInputChannels(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
getNumberOfInputChannels // @return number of input channels
:(
* Basic interface for inputs of stream operators. | ||
*/ | ||
@Internal | ||
public interface Input extends AsyncDataInput<StreamElement>, Closeable { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Generally speaking Closeable
is better and preferred - it extends Autocloseable
and provide stronger/easier to use contract of close()
being idempotent.
Is there a reason why Input#close()
can not be idempotent?
*/ | ||
@Internal | ||
public interface Input extends AsyncDataInput<StreamElement>, Closeable { | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree, but I didn't want to add a dead code now - we can extend this interface in a follow up commit.
* Basic interface for inputs of stream operators. | ||
*/ | ||
@Internal | ||
public interface Input extends AsyncDataInput<StreamElement>, Closeable { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've also noticed that (I was struggling with finding a good name for AsyncDataInput
), but what would you suggest?
} | ||
} | ||
|
||
Optional<BufferOrEvent> bufferOrEvent = barrierHandler.pollNext(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
True, that's the next thing that I want to address. In this commit I just want to make sure that the basic abstraction of NetworkInput
is not introducing performance regressions, which was one of our concern in the offline discussions.
StreamElement recordOrMark = input.pollNext(); | ||
if (recordOrMark == null) { | ||
input.isAvailable().get(); | ||
return input.isFinished(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
😳
This might explain deadlocks that were found by our tests :)
private void processElement(StreamElement recordOrMark) throws Exception { | ||
if (recordOrMark.isWatermark()) { | ||
// handle watermark | ||
statusWatermarkValve.inputWatermark(recordOrMark.asWatermark(), currentChannel); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
True, there was a bug. However I'm not sure if we should squeeze more functionality to the NetworkInput
. As it is now, it's a pretty small and nice class, that has a single responsibility - deserializing records, it would be nice to keep it as that.
I was actually thinking about maybe latter adding another wrapper class, that would wrap Input
and encapsulate watermark handling?
General comment first, I noticed that Travis indicates some related test failures. |
3b85ebe
to
6f25edf
Compare
@pnowojski I cannot check the Travis result because the build is not going through. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For most part the PR looks good, had a couple more comments and we also require a working build.
|
||
@Override | ||
public int getLastChannel() { | ||
return currentChannel; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Consistent naming between field and method?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm, you are right. Somehow I felt that the the lastChannel
is a better name for the user of the class while currentChannel
better reflects the logic inside, but consistency trumps this.
@@ -254,7 +248,19 @@ private void handleEndOfPartitionEvent(BufferOrEvent bufferOrEvent, InputGate in | |||
throw new IllegalStateException("Couldn't find input gate in set of remaining " + | |||
"input gates."); | |||
} | |||
if (isFinished()) { | |||
markAvailable(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The naming now is a bit misleading, if I read this as a text: this is finished and no event will become available anymore, so I mark it as available? Maybe this shifted the whole concept from indicating availability of data to indicating that some thing in the status has changed, either data is available or the input noticed that is it finished.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Check my other response, actually I was never thinking about this in terms of "data availability" but in terms of "input being available for processing/handling". If input is in isFinished
state, it has to be isAvailable
as well from this perspective.
* not completed futures should become completed once there is more input available or if | ||
* the input {@link #isFinished()}. | ||
*/ | ||
CompletableFuture<?> isAvailable(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So should this be isAvailableOrFinished
or maybe awaitStatusChange
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Originally I was proposing isBlocked()
name for this method - whether the input/source/thingy is blocked and waiting for more input. Someone else (Stephan?) preferred isAvailable
, since isBlocked
returning a feature was confusing for him and Becket. IMO isAvailable
is good enough, as long as you think about it in the similar terms: is the input available for further processing - in other words, the opposite of isBlocked()
. Not if has it more data available.
Either way I would vote isBlocked
> isAvailable
> available
> awaitStatusChange
> isAvailableOrFinished
.
Can we postpone the name voting until the source interface refactoring, because first and foremost I want this naming convention be in sync in those two places? We can rename it quite easily, but I don't want to spend too much time discussing it twice with a different audience.
* Basic interface for inputs of stream operators. | ||
*/ | ||
@Internal | ||
public interface Input extends AsyncDataInput<StreamElement>, Closeable { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@pnowojski @sunhaibotb ping so that we can address this if we aggree upon my suggestion.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have addressed/responded to your comments @StefanRRichter. Tests were not passing because of some indirect conflict when rebasing the code on the latest master that happened almost at the same time I was pushing the code. Rebased again, hopefully travis will be finally green.
* not completed futures should become completed once there is more input available or if | ||
* the input {@link #isFinished()}. | ||
*/ | ||
CompletableFuture<?> isAvailable(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Originally I was proposing isBlocked()
name for this method - whether the input/source/thingy is blocked and waiting for more input. Someone else (Stephan?) preferred isAvailable
, since isBlocked
returning a feature was confusing for him and Becket. IMO isAvailable
is good enough, as long as you think about it in the similar terms: is the input available for further processing - in other words, the opposite of isBlocked()
. Not if has it more data available.
Either way I would vote isBlocked
> isAvailable
> available
> awaitStatusChange
> isAvailableOrFinished
.
Can we postpone the name voting until the source interface refactoring, because first and foremost I want this naming convention be in sync in those two places? We can rename it quite easily, but I don't want to spend too much time discussing it twice with a different audience.
@@ -254,7 +248,19 @@ private void handleEndOfPartitionEvent(BufferOrEvent bufferOrEvent, InputGate in | |||
throw new IllegalStateException("Couldn't find input gate in set of remaining " + | |||
"input gates."); | |||
} | |||
if (isFinished()) { | |||
markAvailable(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Check my other response, actually I was never thinking about this in terms of "data availability" but in terms of "input being available for processing/handling". If input is in isFinished
state, it has to be isAvailable
as well from this perspective.
|
||
@Override | ||
public int getLastChannel() { | ||
return currentChannel; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm, you are right. Somehow I felt that the the lastChannel
is a better name for the user of the class while currentChannel
better reflects the logic inside, but consistency trumps this.
*/ | ||
@Internal | ||
public interface Input extends NullableAsyncDataInput<StreamElement>, Closeable { | ||
int UNSPECIFED = -1; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
BTW UNSPECIFED
-> UNSPECIFIED
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks good so far, found one more type. I think we can postpone the discussion about the name of the available-method. However, I would like to have the discussion about the name of Input
resolved before merging.
202c539
to
9b8cd5d
Compare
...streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamInputProcessor.java
Outdated
Show resolved
Hide resolved
5d59353
to
5342482
Compare
552310f
to
42175da
Compare
…Gate#isFinished is true Finished InputGates should be available for reading, otherwise reader can deadlock waiting on a future that won't be completed ever.
Make the most commonly expected condition the first one.
This PR depends on #8467
It introduces
Input
interface, it's implementationNetworkInput
(extracted fromStreamInputProcessor
) and use them inStreamInputProcessor
.Local benchmark run shows increase in records throughput of
SumLongsBenchmark
from 8600 records/ms to 9200-9400 records/ms.Does this pull request potentially affect one of the following parts:
@Public(Evolving)
: (yes / no)Documentation