From 3beb65b8b3ee13d9901763c0fc2138613a36d154 Mon Sep 17 00:00:00 2001 From: zhuhaifengleon Date: Wed, 26 Oct 2016 15:48:31 +0800 Subject: [PATCH] [FLINK-4923] [Metrics] Expose input/output buffers and bufferPool usage as a metric for a Task --- .../partition/PipelinedSubpartition.java | 6 ++ .../io/network/partition/ResultPartition.java | 21 +++++ .../partition/SpillableSubpartition.java | 6 ++ .../partition/consumer/SingleInputGate.java | 18 +++++ .../metrics/groups/TaskIOMetricGroup.java | 9 +++ .../flink/runtime/taskmanager/Task.java | 79 +++++++++++++++++++ 6 files changed, 139 insertions(+) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartition.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartition.java index 2d7097de5d0fb3..a6cf03b739d40d 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartition.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartition.java @@ -220,4 +220,10 @@ boolean registerListener(NotificationListener listener) { throw new IllegalStateException("Already registered listener."); } } + + public int getNumberOfQueuedBuffers() { + synchronized (buffers) { + return buffers.size(); + } + } } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartition.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartition.java index 5bbfab1e944ba1..a19e6163dad827 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartition.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartition.java @@ -220,6 +220,10 @@ public BufferProvider getBufferProvider() { return bufferPool; } + public BufferPool getBufferPool() { + return bufferPool; + } + public int getTotalNumberOfBuffers() { return totalNumberOfBuffers; } @@ -228,6 +232,23 @@ public long getTotalNumberOfBytes() { return totalNumberOfBytes; } + public int getNumberOfQueuedBuffers() { + int totalBuffers = 0; + + for (ResultSubpartition subpartition : subpartitions) { + + if (subpartition instanceof PipelinedSubpartition) { + totalBuffers += ((PipelinedSubpartition) subpartition).getNumberOfQueuedBuffers(); + } + + if (subpartition instanceof SpillableSubpartition) { + totalBuffers += ((SpillableSubpartition) subpartition).getNumberOfQueuedBuffers(); + } + } + + return totalBuffers; + } + // ------------------------------------------------------------------------ /** diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartition.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartition.java index 3e4692a62889b9..e2b8692a080a5e 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartition.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartition.java @@ -227,4 +227,10 @@ public String toString() { getTotalNumberOfBuffers(), getTotalNumberOfBytes(), isFinished, readView != null, spillWriter != null); } + + public int getNumberOfQueuedBuffers() { + synchronized (buffers) { + return buffers.size(); + } + } } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java index f4e4325cc630d0..cbd89d8a762461 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java @@ -212,6 +212,10 @@ BufferProvider getBufferProvider() { return bufferPool; } + public BufferPool getBufferPool() { + return bufferPool; + } + @Override public int getPageSize() { if (bufferPool != null) { @@ -222,6 +226,20 @@ public int getPageSize() { } } + public int getNumberOfQueuedBuffers() { + int totalBuffers = 0; + + for (Map.Entry entry: inputChannels.entrySet()) { + InputChannel channel = entry.getValue(); + + if (channel instanceof RemoteInputChannel) { + totalBuffers += ((RemoteInputChannel) channel).getNumberOfQueuedBuffers(); + } + } + + return totalBuffers; + } + // ------------------------------------------------------------------------ // Setup/Life-cycle // ------------------------------------------------------------------------ diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/TaskIOMetricGroup.java b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/TaskIOMetricGroup.java index a726c2683f146d..41f368d601cf21 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/TaskIOMetricGroup.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/TaskIOMetricGroup.java @@ -19,6 +19,7 @@ package org.apache.flink.runtime.metrics.groups; import org.apache.flink.metrics.Counter; +import org.apache.flink.metrics.MetricGroup; /** * Metric group that contains shareable pre-defined IO-related metrics. The metrics registration is @@ -30,12 +31,16 @@ public class TaskIOMetricGroup extends ProxyMetricGroup { private final Counter numBytesInLocal; private final Counter numBytesInRemote; + private final MetricGroup buffers; + public TaskIOMetricGroup(TaskMetricGroup parent) { super(parent); this.numBytesOut = counter("numBytesOut"); this.numBytesInLocal = counter("numBytesInLocal"); this.numBytesInRemote = counter("numBytesInRemote"); + + this.buffers = addGroup("Buffers"); } public Counter getNumBytesOutCounter() { @@ -49,4 +54,8 @@ public Counter getNumBytesInLocalCounter() { public Counter getNumBytesInRemoteCounter() { return numBytesInRemote; } + + public MetricGroup getBuffersGroup() { + return buffers; + } } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java index bd522bd9295969..b84df4f1bf70cb 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java @@ -25,6 +25,7 @@ import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.Configuration; import org.apache.flink.core.fs.Path; +import org.apache.flink.metrics.Gauge; import org.apache.flink.runtime.accumulators.AccumulatorRegistry; import org.apache.flink.runtime.blob.BlobKey; import org.apache.flink.runtime.broadcast.BroadcastVariableManager; @@ -347,6 +348,12 @@ public Task( // finally, create the executing thread, but do not start it executingThread = new Thread(TASK_THREADS_GROUP, this, taskNameWithSubtask); + + // add metric for buffers + this.metrics.getIOMetricGroup().getBuffersGroup().gauge("numIn", new InputBuffersGauge()); + this.metrics.getIOMetricGroup().getBuffersGroup().gauge("numOut", new OutputBuffersGauge()); + this.metrics.getIOMetricGroup().getBuffersGroup().gauge("InPoolUsage", new InputBufferPoolUsageGauge()); + this.metrics.getIOMetricGroup().getBuffersGroup().gauge("OutPoolUsage", new OutputBufferPoolUsageGauge()); } // ------------------------------------------------------------------------ @@ -1264,4 +1271,76 @@ public void run() { } } } + + // ------------------------------------------------------------------------ + // metrics + // ------------------------------------------------------------------------ + + private class InputBuffersGauge implements Gauge { + + @Override + public Integer getValue() { + int totalBuffers = 0; + + for(SingleInputGate inputGate: inputGates) { + totalBuffers += inputGate.getNumberOfQueuedBuffers(); + } + + return totalBuffers; + } + } + + private class OutputBuffersGauge implements Gauge { + + @Override + public Integer getValue() { + int totalBuffers = 0; + + for(ResultPartition producedPartition: producedPartitions) { + totalBuffers += producedPartition.getNumberOfQueuedBuffers(); + } + + return totalBuffers; + } + } + + private class InputBufferPoolUsageGauge implements Gauge { + + @Override + public Float getValue() { + int availableBuffers = 0; + int bufferPoolSize = 0; + + for(SingleInputGate inputGate: inputGates) { + availableBuffers += inputGate.getBufferPool().getNumberOfAvailableMemorySegments(); + bufferPoolSize += inputGate.getBufferPool().getNumBuffers(); + } + + if (bufferPoolSize != 0) { + return ((float)(bufferPoolSize - availableBuffers)) / bufferPoolSize; + } else { + return 0.0f; + } + } + } + + private class OutputBufferPoolUsageGauge implements Gauge { + + @Override + public Float getValue() { + int availableBuffers = 0; + int bufferPoolSize = 0; + + for(ResultPartition resultPartition: producedPartitions){ + availableBuffers += resultPartition.getBufferPool().getNumberOfAvailableMemorySegments(); + bufferPoolSize += resultPartition.getBufferPool().getNumBuffers(); + } + + if (bufferPoolSize != 0) { + return ((float)(bufferPoolSize - availableBuffers)) / bufferPoolSize; + } else { + return 0.0f; + } + } + } }