Skip to content

Commit

Permalink
[FLINK-4923] [Metrics] Expose input/output buffers and bufferPool usa…
Browse files Browse the repository at this point in the history
…ge as a metric for a Task
  • Loading branch information
zhuhaifengleon committed Oct 26, 2016
1 parent 11fa089 commit 3beb65b
Show file tree
Hide file tree
Showing 6 changed files with 139 additions and 0 deletions.
Expand Up @@ -220,4 +220,10 @@ boolean registerListener(NotificationListener listener) {
throw new IllegalStateException("Already registered listener.");
}
}

public int getNumberOfQueuedBuffers() {
synchronized (buffers) {
return buffers.size();
}
}
}
Expand Up @@ -220,6 +220,10 @@ public BufferProvider getBufferProvider() {
return bufferPool;
}

public BufferPool getBufferPool() {
return bufferPool;
}

public int getTotalNumberOfBuffers() {
return totalNumberOfBuffers;
}
Expand All @@ -228,6 +232,23 @@ public long getTotalNumberOfBytes() {
return totalNumberOfBytes;
}

public int getNumberOfQueuedBuffers() {
int totalBuffers = 0;

for (ResultSubpartition subpartition : subpartitions) {

if (subpartition instanceof PipelinedSubpartition) {
totalBuffers += ((PipelinedSubpartition) subpartition).getNumberOfQueuedBuffers();
}

if (subpartition instanceof SpillableSubpartition) {
totalBuffers += ((SpillableSubpartition) subpartition).getNumberOfQueuedBuffers();
}
}

return totalBuffers;
}

// ------------------------------------------------------------------------

/**
Expand Down
Expand Up @@ -227,4 +227,10 @@ public String toString() {
getTotalNumberOfBuffers(), getTotalNumberOfBytes(), isFinished, readView != null,
spillWriter != null);
}

public int getNumberOfQueuedBuffers() {
synchronized (buffers) {
return buffers.size();
}
}
}
Expand Up @@ -212,6 +212,10 @@ BufferProvider getBufferProvider() {
return bufferPool;
}

public BufferPool getBufferPool() {
return bufferPool;
}

@Override
public int getPageSize() {
if (bufferPool != null) {
Expand All @@ -222,6 +226,20 @@ public int getPageSize() {
}
}

public int getNumberOfQueuedBuffers() {
int totalBuffers = 0;

for (Map.Entry<IntermediateResultPartitionID, InputChannel> entry: inputChannels.entrySet()) {
InputChannel channel = entry.getValue();

if (channel instanceof RemoteInputChannel) {
totalBuffers += ((RemoteInputChannel) channel).getNumberOfQueuedBuffers();
}
}

return totalBuffers;
}

// ------------------------------------------------------------------------
// Setup/Life-cycle
// ------------------------------------------------------------------------
Expand Down
Expand Up @@ -19,6 +19,7 @@
package org.apache.flink.runtime.metrics.groups;

import org.apache.flink.metrics.Counter;
import org.apache.flink.metrics.MetricGroup;

/**
* Metric group that contains shareable pre-defined IO-related metrics. The metrics registration is
Expand All @@ -30,12 +31,16 @@ public class TaskIOMetricGroup extends ProxyMetricGroup<TaskMetricGroup> {
private final Counter numBytesInLocal;
private final Counter numBytesInRemote;

private final MetricGroup buffers;

public TaskIOMetricGroup(TaskMetricGroup parent) {
super(parent);

this.numBytesOut = counter("numBytesOut");
this.numBytesInLocal = counter("numBytesInLocal");
this.numBytesInRemote = counter("numBytesInRemote");

this.buffers = addGroup("Buffers");
}

public Counter getNumBytesOutCounter() {
Expand All @@ -49,4 +54,8 @@ public Counter getNumBytesInLocalCounter() {
public Counter getNumBytesInRemoteCounter() {
return numBytesInRemote;
}

public MetricGroup getBuffersGroup() {
return buffers;
}
}
Expand Up @@ -25,6 +25,7 @@
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.fs.Path;
import org.apache.flink.metrics.Gauge;
import org.apache.flink.runtime.accumulators.AccumulatorRegistry;
import org.apache.flink.runtime.blob.BlobKey;
import org.apache.flink.runtime.broadcast.BroadcastVariableManager;
Expand Down Expand Up @@ -347,6 +348,12 @@ public Task(

// finally, create the executing thread, but do not start it
executingThread = new Thread(TASK_THREADS_GROUP, this, taskNameWithSubtask);

// add metric for buffers
this.metrics.getIOMetricGroup().getBuffersGroup().gauge("numIn", new InputBuffersGauge());
this.metrics.getIOMetricGroup().getBuffersGroup().gauge("numOut", new OutputBuffersGauge());
this.metrics.getIOMetricGroup().getBuffersGroup().gauge("InPoolUsage", new InputBufferPoolUsageGauge());
this.metrics.getIOMetricGroup().getBuffersGroup().gauge("OutPoolUsage", new OutputBufferPoolUsageGauge());
}

// ------------------------------------------------------------------------
Expand Down Expand Up @@ -1264,4 +1271,76 @@ public void run() {
}
}
}

// ------------------------------------------------------------------------
// metrics
// ------------------------------------------------------------------------

private class InputBuffersGauge implements Gauge<Integer> {

@Override
public Integer getValue() {
int totalBuffers = 0;

for(SingleInputGate inputGate: inputGates) {
totalBuffers += inputGate.getNumberOfQueuedBuffers();
}

return totalBuffers;
}
}

private class OutputBuffersGauge implements Gauge<Integer> {

@Override
public Integer getValue() {
int totalBuffers = 0;

for(ResultPartition producedPartition: producedPartitions) {
totalBuffers += producedPartition.getNumberOfQueuedBuffers();
}

return totalBuffers;
}
}

private class InputBufferPoolUsageGauge implements Gauge<Float> {

@Override
public Float getValue() {
int availableBuffers = 0;
int bufferPoolSize = 0;

for(SingleInputGate inputGate: inputGates) {
availableBuffers += inputGate.getBufferPool().getNumberOfAvailableMemorySegments();
bufferPoolSize += inputGate.getBufferPool().getNumBuffers();
}

if (bufferPoolSize != 0) {
return ((float)(bufferPoolSize - availableBuffers)) / bufferPoolSize;
} else {
return 0.0f;
}
}
}

private class OutputBufferPoolUsageGauge implements Gauge<Float> {

@Override
public Float getValue() {
int availableBuffers = 0;
int bufferPoolSize = 0;

for(ResultPartition resultPartition: producedPartitions){
availableBuffers += resultPartition.getBufferPool().getNumberOfAvailableMemorySegments();
bufferPoolSize += resultPartition.getBufferPool().getNumBuffers();
}

if (bufferPoolSize != 0) {
return ((float)(bufferPoolSize - availableBuffers)) / bufferPoolSize;
} else {
return 0.0f;
}
}
}
}

0 comments on commit 3beb65b

Please sign in to comment.