-
Notifications
You must be signed in to change notification settings - Fork 13.8k
[FLINK-12603][network] Remove getOwningTaskName method from InputGate #8529
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
|
Thanks a lot for your contribution to the Apache Flink project. I'm the @flinkbot. I help the community Review Progress
Please see the Pull Request Review Guide for a full explanation of the review process. DetailsThe Bot is tracking the review progress through labels. Labels are applied according to the order of the review items. For consensus, approval by a Flink committer of PMC member is required Bot commandsThe @flinkbot bot supports the following commands:
|
|
@flinkbot attention @azagrebin |
azagrebin
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @zhijiangW ! I like the refactoring and left some smaller comments. One bigger concern is that it seems we still depends directly on some network specific options and I am not sure whether they make sense for all shuffle services.
...streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamInputProcessor.java
Outdated
Show resolved
Hide resolved
...eaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputProcessor.java
Outdated
Show resolved
Hide resolved
...k-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/InputProcessorUtil.java
Outdated
Show resolved
Hide resolved
...k-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/InputProcessorUtil.java
Outdated
Show resolved
Hide resolved
...me/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java
Show resolved
Hide resolved
...k-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/InputProcessorUtil.java
Outdated
Show resolved
Hide resolved
|
Thanks for the reviews and good suggestions @azagrebin ! Yes, we still rely on some network options in I pushed and squashed the commits for addressing the other issues. |
azagrebin
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for addressing comments @zhijiangW !
getOwningTaskName looks good to me.
We can consider NETWORK_CREDIT_MODEL in other PR. Maybe, we add ShuffleService#supportsCachingOrBlockingInputChannels API flag or something like that. Then it could be propagated somehow to StreamTask. getPage might refactored similar way.
At the moment I would suggest we do not remove getPage in this PR because querying network configuration directly exposes its details more than justing having InputGate.getPage. I also discussed with @pnowojski using Buffer.size for this but it looks like a bigger effort which could be a right way to go at the end but not obvious and probably needs a separate issue. Removing of InputGate.getPage does not look like a big blocker for Shuffle API and could stay in InputGate for now although I agree it is not perfect to have it as InputGate.getPage.
could we reduce the scope of this PR by addressing only getOwningTaskName at the moment? because this part looks already mergable, WDYT?
flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
Outdated
Show resolved
Hide resolved
|
Thanks for further reviews @azagrebin ! |
b7707c8 to
14aa43b
Compare
azagrebin
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @zhijiangW ! LGTM 👍
tillrohrmann
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for this PR @zhijiangW. I had some comments which we should resolve before merging.
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BarrierBuffer.java
Show resolved
Hide resolved
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BarrierBuffer.java
Outdated
Show resolved
Hide resolved
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BarrierBuffer.java
Outdated
Show resolved
Hide resolved
flink-core/src/main/java/org/apache/flink/api/common/TaskInfo.java
Outdated
Show resolved
Hide resolved
flink-core/src/main/java/org/apache/flink/api/common/TaskInfo.java
Outdated
Show resolved
Hide resolved
flink-core/src/main/java/org/apache/flink/api/common/TaskInfo.java
Outdated
Show resolved
Hide resolved
...treaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTask.java
Outdated
Show resolved
Hide resolved
|
@tillrohrmann thanks for the confirmation and I have addressed the comments by squashing the commits. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for addressing my comments @zhijiangW. I had one last comment. Moreover, the build is not going through because of
[ERROR] /home/travis/build/apache/flink/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/InputGateWithMetrics.java:[58,9] method does not override or implement a method from a supertype
11:30:03.090 [ERROR] /home/travis/build/apache/flink/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/InputGateWithMetrics.java:[60,33] cannot find symbol
symbol: method getOwningTaskName()
location: variable inputGate of type org.apache.flink.runtime.io.network.partition.consumer.InputGate
...treaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTask.java
Outdated
Show resolved
Hide resolved
|
@tillrohrmann I have addressed the last comment and the travis issue is caused by another merged PR which is also solved now. |
In order to make abstract InputGate simple for extending new implementations in shuffle service architecture, we could remove unnecessary methods from it. InputGate#getOwningTaskName is only used for debugging log in BarrierBuffer and StreamInputProcessor. This task name could also be generated in StreamTask via Environment#getTaskInfo and Environment#getExecutionId. Then it could be passed into the constructors of BarrierBuffer/StreamInputProcessor for use.
tillrohrmann
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks a lot for addressing my comments @zhijiangW. LGTM. Merging this PR now.
What is the purpose of the change
In order to make abstract
InputGatesimple for extending new implementations in shuffle service architecture, we could remove unnecessary methods from it.Currently
InputGate#getOwningTaskNameis only used for debugging log inBarrierBufferandStreamInputProcessor. This task name could be got fromEnvironment#getTaskInfoand then be passed into the constructor ofBarrierBuffer/StreamInputProcessorfor use.Brief change log
TaskInfogetOwningTaskNamefromInputGateabstract methodsBarrierBufferandStreamInputProcessorVerifying this change
covered by existing tests.
Does this pull request potentially affect one of the following parts:
@Public(Evolving): (yes / no)Documentation