Skip to content

Commit

Permalink
[FLINK-4923] [metrics] Expose Task's input/output buffer queue length…
Browse files Browse the repository at this point in the history
…s and bufferPool usage as a metrics

This closes apache#2693
  • Loading branch information
zhuhaifengleon authored and StephanEwen committed Oct 27, 2016
1 parent 27fd249 commit 7770ca3
Show file tree
Hide file tree
Showing 8 changed files with 187 additions and 6 deletions.
Expand Up @@ -220,4 +220,9 @@ boolean registerListener(NotificationListener listener) {
throw new IllegalStateException("Already registered listener.");
}
}

@Override
public int getNumberOfQueuedBuffers() {
return buffers.size();
}
}
Expand Up @@ -220,6 +220,10 @@ public BufferProvider getBufferProvider() {
return bufferPool;
}

public BufferPool getBufferPool() {
return bufferPool;
}

public int getTotalNumberOfBuffers() {
return totalNumberOfBuffers;
}
Expand All @@ -228,6 +232,16 @@ public long getTotalNumberOfBytes() {
return totalNumberOfBytes;
}

public int getNumberOfQueuedBuffers() {
int totalBuffers = 0;

for (ResultSubpartition subpartition : subpartitions) {
totalBuffers += subpartition.getNumberOfQueuedBuffers();
}

return totalBuffers;
}

// ------------------------------------------------------------------------

/**
Expand Down
Expand Up @@ -83,4 +83,6 @@ protected Throwable getFailureCause() {

abstract public boolean isReleased();

abstract public int getNumberOfQueuedBuffers();

}
Expand Up @@ -227,4 +227,9 @@ public String toString() {
getTotalNumberOfBuffers(), getTotalNumberOfBytes(), isFinished, readView != null,
spillWriter != null);
}

@Override
public int getNumberOfQueuedBuffers() {
return buffers.size();
}
}
Expand Up @@ -212,6 +212,10 @@ BufferProvider getBufferProvider() {
return bufferPool;
}

public BufferPool getBufferPool() {
return bufferPool;
}

@Override
public int getPageSize() {
if (bufferPool != null) {
Expand All @@ -222,6 +226,31 @@ public int getPageSize() {
}
}

public int getNumberOfQueuedBuffers() {
int retry = 0;

// re-try 3 times, if fails, return 0 for "unknown"
while(retry < 3) {
try {
int totalBuffers = 0;

for (Map.Entry<IntermediateResultPartitionID, InputChannel> entry : inputChannels.entrySet()) {
InputChannel channel = entry.getValue();

if (channel instanceof RemoteInputChannel) {
totalBuffers += ((RemoteInputChannel) channel).getNumberOfQueuedBuffers();
}
}

return totalBuffers;
} catch (Exception e) {
retry++;
}
}

return 0;
}

// ------------------------------------------------------------------------
// Setup/Life-cycle
// ------------------------------------------------------------------------
Expand Down
Expand Up @@ -19,6 +19,11 @@
package org.apache.flink.runtime.metrics.groups;

import org.apache.flink.metrics.Counter;
import org.apache.flink.metrics.Gauge;
import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.runtime.taskmanager.Task;
import org.apache.flink.runtime.io.network.partition.ResultPartition;
import org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate;

/**
* Metric group that contains shareable pre-defined IO-related metrics. The metrics registration is
Expand All @@ -30,12 +35,16 @@ public class TaskIOMetricGroup extends ProxyMetricGroup<TaskMetricGroup> {
private final Counter numBytesInLocal;
private final Counter numBytesInRemote;

private final MetricGroup buffers;

public TaskIOMetricGroup(TaskMetricGroup parent) {
super(parent);

this.numBytesOut = counter("numBytesOut");
this.numBytesInLocal = counter("numBytesInLocal");
this.numBytesInRemote = counter("numBytesInRemote");

this.buffers = addGroup("buffers");
}

public Counter getNumBytesOutCounter() {
Expand All @@ -49,4 +58,115 @@ public Counter getNumBytesInLocalCounter() {
public Counter getNumBytesInRemoteCounter() {
return numBytesInRemote;
}

public MetricGroup getBuffersGroup() {
return buffers;
}

// ------------------------------------------------------------------------
// metrics of Buffers group
// ------------------------------------------------------------------------
/**
* Input received buffers gauge of a task
*/
public static final class InputBuffersGauge implements Gauge<Integer> {

private final Task task;

public InputBuffersGauge(Task task) {
this.task = task;
}

@Override
public Integer getValue() {
int totalBuffers = 0;

for (SingleInputGate inputGate : task.getAllInputGates()) {
totalBuffers += inputGate.getNumberOfQueuedBuffers();
}

return totalBuffers;
}
}

/**
* Output produced buffers gauge of a task
*/
public static final class OutputBuffersGauge implements Gauge<Integer> {

private final Task task;

public OutputBuffersGauge(Task task) {
this.task = task;
}

@Override
public Integer getValue() {
int totalBuffers = 0;

for (ResultPartition producedPartition : task.getProducedPartitions()) {
totalBuffers += producedPartition.getNumberOfQueuedBuffers();
}

return totalBuffers;
}
}

/**
* Input buffer pool usage gauge of a task
*/
public static final class InputBufferPoolUsageGauge implements Gauge<Float> {

private final Task task;

public InputBufferPoolUsageGauge(Task task) {
this.task = task;
}

@Override
public Float getValue() {
int availableBuffers = 0;
int bufferPoolSize = 0;

for (SingleInputGate inputGate : task.getAllInputGates()) {
availableBuffers += inputGate.getBufferPool().getNumberOfAvailableMemorySegments();
bufferPoolSize += inputGate.getBufferPool().getNumBuffers();
}

if (bufferPoolSize != 0) {
return ((float)(bufferPoolSize - availableBuffers)) / bufferPoolSize;
} else {
return 0.0f;
}
}
}

/**
* Output buffer pool usage gauge of a task
*/
public static final class OutputBufferPoolUsageGauge implements Gauge<Float> {

private final Task task;

public OutputBufferPoolUsageGauge(Task task) {
this.task = task;
}

@Override
public Float getValue() {
int availableBuffers = 0;
int bufferPoolSize = 0;

for (ResultPartition resultPartition : task.getProducedPartitions()) {
availableBuffers += resultPartition.getBufferPool().getNumberOfAvailableMemorySegments();
bufferPoolSize += resultPartition.getBufferPool().getNumBuffers();
}

if (bufferPoolSize != 0) {
return ((float)(bufferPoolSize - availableBuffers)) / bufferPoolSize;
} else {
return 0.0f;
}
}
}
}
Expand Up @@ -26,6 +26,7 @@
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.TaskManagerOptions;
import org.apache.flink.core.fs.Path;
import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.runtime.accumulators.AccumulatorRegistry;
import org.apache.flink.runtime.blob.BlobKey;
import org.apache.flink.runtime.broadcast.BroadcastVariableManager;
Expand Down Expand Up @@ -57,6 +58,7 @@
import org.apache.flink.runtime.jobgraph.tasks.StatefulTask;
import org.apache.flink.runtime.jobgraph.tasks.StoppableTask;
import org.apache.flink.runtime.memory.MemoryManager;
import org.apache.flink.runtime.metrics.groups.TaskIOMetricGroup;
import org.apache.flink.runtime.metrics.groups.TaskMetricGroup;
import org.apache.flink.runtime.query.TaskKvStateRegistry;
import org.apache.flink.runtime.state.TaskStateHandles;
Expand Down Expand Up @@ -353,6 +355,15 @@ public Task(

// finally, create the executing thread, but do not start it
executingThread = new Thread(TASK_THREADS_GROUP, this, taskNameWithSubtask);

// add metrics for buffers
if (this.metrics != null && this.metrics.getIOMetricGroup() != null) {
MetricGroup bufferMetrics = this.metrics.getIOMetricGroup().getBuffersGroup();
bufferMetrics.gauge("inputQueueLength", new TaskIOMetricGroup.InputBuffersGauge(this));
bufferMetrics.gauge("outputQueueLength", new TaskIOMetricGroup.OutputBuffersGauge(this));
bufferMetrics.gauge("inPoolUsage", new TaskIOMetricGroup.InputBufferPoolUsageGauge(this));
bufferMetrics.gauge("outPoolUsage", new TaskIOMetricGroup.OutputBufferPoolUsageGauge(this));
}
}

// ------------------------------------------------------------------------
Expand Down
Expand Up @@ -44,19 +44,14 @@
import org.apache.flink.runtime.memory.MemoryManager;
import org.apache.flink.runtime.metrics.groups.TaskMetricGroup;
import org.apache.flink.runtime.query.TaskKvStateRegistry;
import org.apache.flink.runtime.state.ChainedStateHandle;
import org.apache.flink.runtime.state.KeyGroupsStateHandle;
import org.apache.flink.runtime.state.OperatorStateHandle;
import org.apache.flink.runtime.state.StreamStateHandle;
import org.apache.flink.runtime.state.TaskStateHandles;
import org.apache.flink.util.SerializedValue;

import org.junit.Before;
import org.junit.Test;

import java.net.URL;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.Executor;

import static org.junit.Assert.assertFalse;
Expand Down

0 comments on commit 7770ca3

Please sign in to comment.