From 986d96fdf543da983edc3e0c6fac77c1ca9d3297 Mon Sep 17 00:00:00 2001 From: zhuhaifengleon Date: Thu, 27 Oct 2016 15:22:13 +0800 Subject: [PATCH] [FLINK-4923] [Metrics] pull the buffers Metric calss and change the metris name --- .../partition/PipelinedSubpartition.java | 3 +- .../io/network/partition/ResultPartition.java | 9 +- .../network/partition/ResultSubpartition.java | 2 + .../partition/SpillableSubpartition.java | 3 +- .../partition/consumer/SingleInputGate.java | 23 +++- .../metrics/groups/TaskIOMetricGroup.java | 111 ++++++++++++++++++ .../flink/runtime/taskmanager/Task.java | 84 +------------ 7 files changed, 139 insertions(+), 96 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartition.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartition.java index a6cf03b739d40..266f581db9eab 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartition.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartition.java @@ -221,9 +221,8 @@ boolean registerListener(NotificationListener listener) { } } + @Override public int getNumberOfQueuedBuffers() { - synchronized (buffers) { return buffers.size(); - } } } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartition.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartition.java index a19e6163dad82..f06cb43c5cddc 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartition.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartition.java @@ -236,14 +236,7 @@ public int getNumberOfQueuedBuffers() { int totalBuffers = 0; for (ResultSubpartition subpartition : subpartitions) { - - if (subpartition instanceof PipelinedSubpartition) { - totalBuffers += ((PipelinedSubpartition) subpartition).getNumberOfQueuedBuffers(); - } - - if (subpartition instanceof SpillableSubpartition) { - totalBuffers += ((SpillableSubpartition) subpartition).getNumberOfQueuedBuffers(); - } + totalBuffers += subpartition.getNumberOfQueuedBuffers(); } return totalBuffers; diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultSubpartition.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultSubpartition.java index b7ca9c4af569c..31c8f73faecd2 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultSubpartition.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultSubpartition.java @@ -83,4 +83,6 @@ protected Throwable getFailureCause() { abstract public boolean isReleased(); + abstract public int getNumberOfQueuedBuffers(); + } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartition.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartition.java index e2b8692a080a5..3f19559a3273c 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartition.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartition.java @@ -228,9 +228,8 @@ public String toString() { spillWriter != null); } + @Override public int getNumberOfQueuedBuffers() { - synchronized (buffers) { return buffers.size(); - } } } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java index cbd89d8a76246..17e8b06973026 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java @@ -227,17 +227,28 @@ public int getPageSize() { } public int getNumberOfQueuedBuffers() { - int totalBuffers = 0; + int retry = 0; - for (Map.Entry entry: inputChannels.entrySet()) { - InputChannel channel = entry.getValue(); + // re-try 3 times, if fails, return 0 for "unknown" + while(retry < 3) { + try { + int totalBuffers = 0; - if (channel instanceof RemoteInputChannel) { - totalBuffers += ((RemoteInputChannel) channel).getNumberOfQueuedBuffers(); + for (Map.Entry entry : inputChannels.entrySet()) { + InputChannel channel = entry.getValue(); + + if (channel instanceof RemoteInputChannel) { + totalBuffers += ((RemoteInputChannel) channel).getNumberOfQueuedBuffers(); + } + } + + return totalBuffers; + } catch (Exception e) { + retry++; } } - return totalBuffers; + return 0; } // ------------------------------------------------------------------------ diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/TaskIOMetricGroup.java b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/TaskIOMetricGroup.java index 41f368d601cf2..0727f21e0e731 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/TaskIOMetricGroup.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/TaskIOMetricGroup.java @@ -19,7 +19,11 @@ package org.apache.flink.runtime.metrics.groups; import org.apache.flink.metrics.Counter; +import org.apache.flink.metrics.Gauge; import org.apache.flink.metrics.MetricGroup; +import org.apache.flink.runtime.taskmanager.Task; +import org.apache.flink.runtime.io.network.partition.ResultPartition; +import org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate; /** * Metric group that contains shareable pre-defined IO-related metrics. The metrics registration is @@ -58,4 +62,111 @@ public Counter getNumBytesInRemoteCounter() { public MetricGroup getBuffersGroup() { return buffers; } + + // ------------------------------------------------------------------------ + // metrics of Buffers group + // ------------------------------------------------------------------------ + /** + * Input received buffers gauge of a task + */ + public static class InputBuffersGauge implements Gauge { + + private final Task task; + + public InputBuffersGauge(Task task) { + this.task = task; + } + + @Override + public Integer getValue() { + int totalBuffers = 0; + + for(SingleInputGate inputGate: task.getAllInputGates()) { + totalBuffers += inputGate.getNumberOfQueuedBuffers(); + } + + return totalBuffers; + } + } + + /** + * Output produced buffers gauge of a task + */ + public static class OutputBuffersGauge implements Gauge { + + private final Task task; + + public OutputBuffersGauge(Task task) { + this.task = task; + } + + @Override + public Integer getValue() { + int totalBuffers = 0; + + for(ResultPartition producedPartition: task.getProducedPartitions()) { + totalBuffers += producedPartition.getNumberOfQueuedBuffers(); + } + + return totalBuffers; + } + } + + /** + * Input buffer pool usage gauge of a task + */ + public static class InputBufferPoolUsageGauge implements Gauge { + + private final Task task; + + public InputBufferPoolUsageGauge(Task task) { + this.task = task; + } + + @Override + public Float getValue() { + int availableBuffers = 0; + int bufferPoolSize = 0; + + for(SingleInputGate inputGate: task.getAllInputGates()) { + availableBuffers += inputGate.getBufferPool().getNumberOfAvailableMemorySegments(); + bufferPoolSize += inputGate.getBufferPool().getNumBuffers(); + } + + if (bufferPoolSize != 0) { + return ((float)(bufferPoolSize - availableBuffers)) / bufferPoolSize; + } else { + return 0.0f; + } + } + } + + /** + * Output buffer pool usage gauge of a task + */ + public static class OutputBufferPoolUsageGauge implements Gauge { + + private final Task task; + + public OutputBufferPoolUsageGauge(Task task) { + this.task = task; + } + + @Override + public Float getValue() { + int availableBuffers = 0; + int bufferPoolSize = 0; + + for(ResultPartition resultPartition: task.getProducedPartitions()) { + availableBuffers += resultPartition.getBufferPool().getNumberOfAvailableMemorySegments(); + bufferPoolSize += resultPartition.getBufferPool().getNumBuffers(); + } + + if (bufferPoolSize != 0) { + return ((float)(bufferPoolSize - availableBuffers)) / bufferPoolSize; + } else { + return 0.0f; + } + } + } } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java index b84df4f1bf70c..5dd0f0777ddbe 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java @@ -25,7 +25,6 @@ import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.Configuration; import org.apache.flink.core.fs.Path; -import org.apache.flink.metrics.Gauge; import org.apache.flink.runtime.accumulators.AccumulatorRegistry; import org.apache.flink.runtime.blob.BlobKey; import org.apache.flink.runtime.broadcast.BroadcastVariableManager; @@ -57,6 +56,7 @@ import org.apache.flink.runtime.jobgraph.tasks.StatefulTask; import org.apache.flink.runtime.jobgraph.tasks.StoppableTask; import org.apache.flink.runtime.memory.MemoryManager; +import org.apache.flink.runtime.metrics.groups.TaskIOMetricGroup; import org.apache.flink.runtime.metrics.groups.TaskMetricGroup; import org.apache.flink.runtime.query.TaskKvStateRegistry; import org.apache.flink.runtime.state.TaskStateHandles; @@ -349,11 +349,11 @@ public Task( // finally, create the executing thread, but do not start it executingThread = new Thread(TASK_THREADS_GROUP, this, taskNameWithSubtask); - // add metric for buffers - this.metrics.getIOMetricGroup().getBuffersGroup().gauge("numIn", new InputBuffersGauge()); - this.metrics.getIOMetricGroup().getBuffersGroup().gauge("numOut", new OutputBuffersGauge()); - this.metrics.getIOMetricGroup().getBuffersGroup().gauge("InPoolUsage", new InputBufferPoolUsageGauge()); - this.metrics.getIOMetricGroup().getBuffersGroup().gauge("OutPoolUsage", new OutputBufferPoolUsageGauge()); + // add metrics for buffers + this.metrics.getIOMetricGroup().getBuffersGroup().gauge("inputQueueLength", new TaskIOMetricGroup.InputBuffersGauge(this)); + this.metrics.getIOMetricGroup().getBuffersGroup().gauge("outputQueueLength", new TaskIOMetricGroup.OutputBuffersGauge(this)); + this.metrics.getIOMetricGroup().getBuffersGroup().gauge("inPoolUsage", new TaskIOMetricGroup.InputBufferPoolUsageGauge(this)); + this.metrics.getIOMetricGroup().getBuffersGroup().gauge("outPoolUsage", new TaskIOMetricGroup.OutputBufferPoolUsageGauge(this)); } // ------------------------------------------------------------------------ @@ -1271,76 +1271,4 @@ public void run() { } } } - - // ------------------------------------------------------------------------ - // metrics - // ------------------------------------------------------------------------ - - private class InputBuffersGauge implements Gauge { - - @Override - public Integer getValue() { - int totalBuffers = 0; - - for(SingleInputGate inputGate: inputGates) { - totalBuffers += inputGate.getNumberOfQueuedBuffers(); - } - - return totalBuffers; - } - } - - private class OutputBuffersGauge implements Gauge { - - @Override - public Integer getValue() { - int totalBuffers = 0; - - for(ResultPartition producedPartition: producedPartitions) { - totalBuffers += producedPartition.getNumberOfQueuedBuffers(); - } - - return totalBuffers; - } - } - - private class InputBufferPoolUsageGauge implements Gauge { - - @Override - public Float getValue() { - int availableBuffers = 0; - int bufferPoolSize = 0; - - for(SingleInputGate inputGate: inputGates) { - availableBuffers += inputGate.getBufferPool().getNumberOfAvailableMemorySegments(); - bufferPoolSize += inputGate.getBufferPool().getNumBuffers(); - } - - if (bufferPoolSize != 0) { - return ((float)(bufferPoolSize - availableBuffers)) / bufferPoolSize; - } else { - return 0.0f; - } - } - } - - private class OutputBufferPoolUsageGauge implements Gauge { - - @Override - public Float getValue() { - int availableBuffers = 0; - int bufferPoolSize = 0; - - for(ResultPartition resultPartition: producedPartitions){ - availableBuffers += resultPartition.getBufferPool().getNumberOfAvailableMemorySegments(); - bufferPoolSize += resultPartition.getBufferPool().getNumBuffers(); - } - - if (bufferPoolSize != 0) { - return ((float)(bufferPoolSize - availableBuffers)) / bufferPoolSize; - } else { - return 0.0f; - } - } - } }