Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FLINK-4923] [Metrics] Expose input/output buffers and bufferPool usa… #2693

Closed
wants to merge 2 commits into from

Conversation

zhuhaifengleon
Copy link
Contributor

The buffers and buffer usage of Input/output bufferPool for a task reflect wether a task congestion.
So we expose the following Metrics as a Buffers MetricGroup on the TaskIOMetricGroup, all these metrics is a gauge.

  1. numIn of Buffers: received buffers of all InputGates of a task
  2. numOut of Buffers: buffers of all produced ResultPartitions of a task
  3. InPoolUsage of Buffers: bufferPool usage of all InputGates of a task
  4. OutPoolUsage of Buffers: bufferPool usage of all produced ResultPartitions of a task

@@ -220,4 +220,10 @@ boolean registerListener(NotificationListener listener) {
throw new IllegalStateException("Already registered listener.");
}
}

public int getNumberOfQueuedBuffers() {
synchronized (buffers) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we avoid synchronization here? The metrics should never influence (block) the other code.
I would rather have the metrics be one off once in a while and avoid the lock here.


for (ResultSubpartition subpartition : subpartitions) {

if (subpartition instanceof PipelinedSubpartition) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you add the getNumberOfQueuedBuffers() method to ResultSubpartition then you do not need to check and cast here.

@@ -227,4 +227,10 @@ public String toString() {
getTotalNumberOfBuffers(), getTotalNumberOfBytes(), isFinished, readView != null,
spillWriter != null);
}

public int getNumberOfQueuedBuffers() {
synchronized (buffers) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same here, try to avoid synchronization.

public int getNumberOfQueuedBuffers() {
int totalBuffers = 0;

for (Map.Entry<IntermediateResultPartitionID, InputChannel> entry: inputChannels.entrySet()) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this map may change asynchronously, so it is probably better to catch exceptions here and re-try for some times. It it fails three times, return -1 for "unknown"

// metrics
// ------------------------------------------------------------------------

private class InputBuffersGauge implements Gauge<Integer> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you pull these classes out into a separate file? The Task class is very large already.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I had pull these classes out into TaskIOMetricGroup since the buffers metrics is Task I/O scope

@StephanEwen
Copy link
Contributor

Good addition, with some request for changes.

I would suggest some name changes for the Gauges:

  • "numIn" --> "inputQueueLength"
  • "numOut" --> "outputQueueLength"
  • "InPoolUsage" --> "inPoolUsage"
  • "outPoolUsage" --> "outPoolUsage"

@StephanEwen
Copy link
Contributor

This looks good, thanks!
Merging this...

StephanEwen pushed a commit to StephanEwen/flink that referenced this pull request Oct 27, 2016
@@ -347,6 +348,12 @@ public Task(

// finally, create the executing thread, but do not start it
executingThread = new Thread(TASK_THREADS_GROUP, this, taskNameWithSubtask);

// add metrics for buffers
this.metrics.getIOMetricGroup().getBuffersGroup().gauge("inputQueueLength", new TaskIOMetricGroup.InputBuffersGauge(this));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we could have a static iniitializer method in the TaskIOMetricGroup for this.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i mean one method that initializes all gauges. it is a simple way of consolidating the metrics in one place.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have rebased merged this already.
How about doing that in a followup patch?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Stephan, have you merged this already? I will do that in a patch after discussion with Chesnay if you had merged it.

StephanEwen pushed a commit to StephanEwen/flink that referenced this pull request Oct 27, 2016
StephanEwen pushed a commit to StephanEwen/flink that referenced this pull request Oct 27, 2016
StephanEwen pushed a commit to StephanEwen/flink that referenced this pull request Oct 27, 2016
@asfgit asfgit closed this in 344fe94 Oct 30, 2016
liuyuzhong pushed a commit to liuyuzhong/flink that referenced this pull request Dec 5, 2016
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
4 participants