From 7770ca3607d0e26f2d7a1b64ad9d8fcffed2b451 Mon Sep 17 00:00:00 2001 From: zhuhaifengleon Date: Wed, 26 Oct 2016 15:48:31 +0800 Subject: [PATCH] [FLINK-4923] [metrics] Expose Task's input/output buffer queue lengths and bufferPool usage as a metrics This closes #2693 --- .../partition/PipelinedSubpartition.java | 5 + .../io/network/partition/ResultPartition.java | 14 ++ .../network/partition/ResultSubpartition.java | 2 + .../partition/SpillableSubpartition.java | 5 + .../partition/consumer/SingleInputGate.java | 29 +++++ .../metrics/groups/TaskIOMetricGroup.java | 120 ++++++++++++++++++ .../flink/runtime/taskmanager/Task.java | 11 ++ .../taskmanager/TaskAsyncCallTest.java | 7 +- 8 files changed, 187 insertions(+), 6 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartition.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartition.java index 2d7097de5d0fb3..266f581db9eabc 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartition.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartition.java @@ -220,4 +220,9 @@ boolean registerListener(NotificationListener listener) { throw new IllegalStateException("Already registered listener."); } } + + @Override + public int getNumberOfQueuedBuffers() { + return buffers.size(); + } } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartition.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartition.java index 5bbfab1e944ba1..f06cb43c5cddc8 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartition.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartition.java @@ -220,6 +220,10 @@ public BufferProvider getBufferProvider() { return bufferPool; } + public BufferPool getBufferPool() { + return bufferPool; + } + public int getTotalNumberOfBuffers() { return totalNumberOfBuffers; } @@ -228,6 +232,16 @@ public long getTotalNumberOfBytes() { return totalNumberOfBytes; } + public int getNumberOfQueuedBuffers() { + int totalBuffers = 0; + + for (ResultSubpartition subpartition : subpartitions) { + totalBuffers += subpartition.getNumberOfQueuedBuffers(); + } + + return totalBuffers; + } + // ------------------------------------------------------------------------ /** diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultSubpartition.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultSubpartition.java index b7ca9c4af569c3..31c8f73faecd28 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultSubpartition.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultSubpartition.java @@ -83,4 +83,6 @@ protected Throwable getFailureCause() { abstract public boolean isReleased(); + abstract public int getNumberOfQueuedBuffers(); + } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartition.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartition.java index 3e4692a62889b9..3f19559a3273c6 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartition.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartition.java @@ -227,4 +227,9 @@ public String toString() { getTotalNumberOfBuffers(), getTotalNumberOfBytes(), isFinished, readView != null, spillWriter != null); } + + @Override + public int getNumberOfQueuedBuffers() { + return buffers.size(); + } } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java index f4e4325cc630d0..17e8b069730268 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java @@ -212,6 +212,10 @@ BufferProvider getBufferProvider() { return bufferPool; } + public BufferPool getBufferPool() { + return bufferPool; + } + @Override public int getPageSize() { if (bufferPool != null) { @@ -222,6 +226,31 @@ public int getPageSize() { } } + public int getNumberOfQueuedBuffers() { + int retry = 0; + + // re-try 3 times, if fails, return 0 for "unknown" + while(retry < 3) { + try { + int totalBuffers = 0; + + for (Map.Entry entry : inputChannels.entrySet()) { + InputChannel channel = entry.getValue(); + + if (channel instanceof RemoteInputChannel) { + totalBuffers += ((RemoteInputChannel) channel).getNumberOfQueuedBuffers(); + } + } + + return totalBuffers; + } catch (Exception e) { + retry++; + } + } + + return 0; + } + // ------------------------------------------------------------------------ // Setup/Life-cycle // ------------------------------------------------------------------------ diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/TaskIOMetricGroup.java b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/TaskIOMetricGroup.java index a726c2683f146d..6621df2a22adf1 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/TaskIOMetricGroup.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/TaskIOMetricGroup.java @@ -19,6 +19,11 @@ package org.apache.flink.runtime.metrics.groups; import org.apache.flink.metrics.Counter; +import org.apache.flink.metrics.Gauge; +import org.apache.flink.metrics.MetricGroup; +import org.apache.flink.runtime.taskmanager.Task; +import org.apache.flink.runtime.io.network.partition.ResultPartition; +import org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate; /** * Metric group that contains shareable pre-defined IO-related metrics. The metrics registration is @@ -30,12 +35,16 @@ public class TaskIOMetricGroup extends ProxyMetricGroup { private final Counter numBytesInLocal; private final Counter numBytesInRemote; + private final MetricGroup buffers; + public TaskIOMetricGroup(TaskMetricGroup parent) { super(parent); this.numBytesOut = counter("numBytesOut"); this.numBytesInLocal = counter("numBytesInLocal"); this.numBytesInRemote = counter("numBytesInRemote"); + + this.buffers = addGroup("buffers"); } public Counter getNumBytesOutCounter() { @@ -49,4 +58,115 @@ public Counter getNumBytesInLocalCounter() { public Counter getNumBytesInRemoteCounter() { return numBytesInRemote; } + + public MetricGroup getBuffersGroup() { + return buffers; + } + + // ------------------------------------------------------------------------ + // metrics of Buffers group + // ------------------------------------------------------------------------ + /** + * Input received buffers gauge of a task + */ + public static final class InputBuffersGauge implements Gauge { + + private final Task task; + + public InputBuffersGauge(Task task) { + this.task = task; + } + + @Override + public Integer getValue() { + int totalBuffers = 0; + + for (SingleInputGate inputGate : task.getAllInputGates()) { + totalBuffers += inputGate.getNumberOfQueuedBuffers(); + } + + return totalBuffers; + } + } + + /** + * Output produced buffers gauge of a task + */ + public static final class OutputBuffersGauge implements Gauge { + + private final Task task; + + public OutputBuffersGauge(Task task) { + this.task = task; + } + + @Override + public Integer getValue() { + int totalBuffers = 0; + + for (ResultPartition producedPartition : task.getProducedPartitions()) { + totalBuffers += producedPartition.getNumberOfQueuedBuffers(); + } + + return totalBuffers; + } + } + + /** + * Input buffer pool usage gauge of a task + */ + public static final class InputBufferPoolUsageGauge implements Gauge { + + private final Task task; + + public InputBufferPoolUsageGauge(Task task) { + this.task = task; + } + + @Override + public Float getValue() { + int availableBuffers = 0; + int bufferPoolSize = 0; + + for (SingleInputGate inputGate : task.getAllInputGates()) { + availableBuffers += inputGate.getBufferPool().getNumberOfAvailableMemorySegments(); + bufferPoolSize += inputGate.getBufferPool().getNumBuffers(); + } + + if (bufferPoolSize != 0) { + return ((float)(bufferPoolSize - availableBuffers)) / bufferPoolSize; + } else { + return 0.0f; + } + } + } + + /** + * Output buffer pool usage gauge of a task + */ + public static final class OutputBufferPoolUsageGauge implements Gauge { + + private final Task task; + + public OutputBufferPoolUsageGauge(Task task) { + this.task = task; + } + + @Override + public Float getValue() { + int availableBuffers = 0; + int bufferPoolSize = 0; + + for (ResultPartition resultPartition : task.getProducedPartitions()) { + availableBuffers += resultPartition.getBufferPool().getNumberOfAvailableMemorySegments(); + bufferPoolSize += resultPartition.getBufferPool().getNumBuffers(); + } + + if (bufferPoolSize != 0) { + return ((float)(bufferPoolSize - availableBuffers)) / bufferPoolSize; + } else { + return 0.0f; + } + } + } } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java index f09e88a37426f6..7ce9b0be0a5493 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java @@ -26,6 +26,7 @@ import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.TaskManagerOptions; import org.apache.flink.core.fs.Path; +import org.apache.flink.metrics.MetricGroup; import org.apache.flink.runtime.accumulators.AccumulatorRegistry; import org.apache.flink.runtime.blob.BlobKey; import org.apache.flink.runtime.broadcast.BroadcastVariableManager; @@ -57,6 +58,7 @@ import org.apache.flink.runtime.jobgraph.tasks.StatefulTask; import org.apache.flink.runtime.jobgraph.tasks.StoppableTask; import org.apache.flink.runtime.memory.MemoryManager; +import org.apache.flink.runtime.metrics.groups.TaskIOMetricGroup; import org.apache.flink.runtime.metrics.groups.TaskMetricGroup; import org.apache.flink.runtime.query.TaskKvStateRegistry; import org.apache.flink.runtime.state.TaskStateHandles; @@ -353,6 +355,15 @@ public Task( // finally, create the executing thread, but do not start it executingThread = new Thread(TASK_THREADS_GROUP, this, taskNameWithSubtask); + + // add metrics for buffers + if (this.metrics != null && this.metrics.getIOMetricGroup() != null) { + MetricGroup bufferMetrics = this.metrics.getIOMetricGroup().getBuffersGroup(); + bufferMetrics.gauge("inputQueueLength", new TaskIOMetricGroup.InputBuffersGauge(this)); + bufferMetrics.gauge("outputQueueLength", new TaskIOMetricGroup.OutputBuffersGauge(this)); + bufferMetrics.gauge("inPoolUsage", new TaskIOMetricGroup.InputBufferPoolUsageGauge(this)); + bufferMetrics.gauge("outPoolUsage", new TaskIOMetricGroup.OutputBufferPoolUsageGauge(this)); + } } // ------------------------------------------------------------------------ diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java index 7dd67ed46c00fc..ed107c7dd58019 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java @@ -44,19 +44,14 @@ import org.apache.flink.runtime.memory.MemoryManager; import org.apache.flink.runtime.metrics.groups.TaskMetricGroup; import org.apache.flink.runtime.query.TaskKvStateRegistry; -import org.apache.flink.runtime.state.ChainedStateHandle; -import org.apache.flink.runtime.state.KeyGroupsStateHandle; -import org.apache.flink.runtime.state.OperatorStateHandle; -import org.apache.flink.runtime.state.StreamStateHandle; import org.apache.flink.runtime.state.TaskStateHandles; import org.apache.flink.util.SerializedValue; + import org.junit.Before; import org.junit.Test; import java.net.URL; -import java.util.Collection; import java.util.Collections; -import java.util.List; import java.util.concurrent.Executor; import static org.junit.Assert.assertFalse;