New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[FLINK-12201][network,metrics] Introduce InputGateWithMetrics in Task to increment numBytesIn metric #8320
Conversation
Thanks a lot for your contribution to the Apache Flink project. I'm the @flinkbot. I help the community Review Progress
Please see the Pull Request Review Guide for a full explanation of the review process. The Bot is tracking the review progress through labels. Labels are applied according to the order of the review items. For consensus, approval by a Flink committer of PMC member is required Bot commandsThe @flinkbot bot supports the following commands:
|
@flinkbot attention @azagrebin |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for this refactoring @zhijiangW ! It looks good overall to me, I've left couple of small comments.
...time/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/BufferOrEvent.java
Outdated
Show resolved
Hide resolved
flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
Show resolved
Hide resolved
309f321
to
0500cee
Compare
@azagrebin I rebased the codes to not rely on the commit of #8310 and made some changes. |
@flinkbot approve all |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @zhijiangW ! I've left some comments.
flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/InputGateWithMetrics.java
Outdated
Show resolved
Hide resolved
flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/InputGateWithMetrics.java
Show resolved
Hide resolved
flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/InputGateWithMetrics.java
Outdated
Show resolved
Hide resolved
flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/InputGateWithMetrics.java
Show resolved
Hide resolved
flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/InputGateWithMetrics.java
Outdated
Show resolved
Hide resolved
flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
Outdated
Show resolved
Hide resolved
@azagrebin I have rebased the master to solve the conflicts. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @zhijiangW ! It looks quite good to me, I left some smaller comments. Especially about benchmark which we should run before merge.
...time/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/BufferOrEvent.java
Outdated
Show resolved
Hide resolved
flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
Outdated
Show resolved
Hide resolved
.../java/org/apache/flink/streaming/runtime/io/benchmark/StreamNetworkBenchmarkEnvironment.java
Show resolved
Hide resolved
Thanks for further reviews @azagrebin ! I pushed one fixup commit for addressing above comments! And I would submit the micro benchmark comparison later. |
.../java/org/apache/flink/streaming/runtime/io/benchmark/StreamNetworkBenchmarkEnvironment.java
Outdated
Show resolved
Hide resolved
I have squashed the commits for addressing the comments. @azagrebin |
.../java/org/apache/flink/streaming/runtime/io/benchmark/StreamNetworkBenchmarkEnvironment.java
Outdated
Show resolved
Hide resolved
92bdba5
to
fc5d18f
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the change @zhijiangW. I've left couple of comments.
FYI, this PR will have minor conflicts with my changes in #8476 . Just in case my PR will be merged before, I've made some minor changes to the InputGate
interface.
flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/InputGateWithMetrics.java
Outdated
Show resolved
Hide resolved
...time/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/BufferOrEvent.java
Outdated
Show resolved
Hide resolved
...me/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java
Outdated
Show resolved
Hide resolved
.../java/org/apache/flink/streaming/runtime/io/benchmark/StreamNetworkBenchmarkEnvironment.java
Show resolved
Hide resolved
…enchmarkEnvironment
… to increment numBytesIn metric Incrementing of numBytesIn metric in SingleInputGate does not depend on shuffle service and can be moved out of network internals into Task. Task could wrap InputGate provided by ShuffleService with InputGateWithMetrics which would increment numBytesIn metric.
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/Buffer.java
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM merging @zhijiangW let me know once you fix the typo in the commit, I'll merge it then.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @zhijiangW ! LGTM
The implementations of getSize and getSizeUnsafe are exactly the same now, which do not need the synchronized way. So We could remove the getSizeUnsafe to make it clean and clear.
What is the purpose of the change
numBytesIn
metric inSingleInputGate
does not depend on shuffle service and can be moved out of network internals intoTask
.Task
could wrapInputGate
provided byShuffleService
withInputGateWithMetrics
which would increment numBytesIn metric.*Brief change log
InputGateWithMetrics
to wrapInputGate
fromShuffleService
inTask
SingleInputGate
and related creation.Verifying this change
This change is a trivial rework / code cleanup without any test coverage.
Does this pull request potentially affect one of the following parts:
@Public(Evolving)
: (yes / no)Documentation