Skip to content

Commit

Permalink
[FLINK-4923] [Metrics] pull the buffers Metric calss and change the m…
Browse files Browse the repository at this point in the history
…etris name
  • Loading branch information
zhuhaifengleon committed Oct 27, 2016
1 parent 363e3ae commit 986d96f
Show file tree
Hide file tree
Showing 7 changed files with 139 additions and 96 deletions.
Expand Up @@ -221,9 +221,8 @@ boolean registerListener(NotificationListener listener) {
}
}

@Override
public int getNumberOfQueuedBuffers() {
synchronized (buffers) {
return buffers.size();
}
}
}
Expand Up @@ -236,14 +236,7 @@ public int getNumberOfQueuedBuffers() {
int totalBuffers = 0;

for (ResultSubpartition subpartition : subpartitions) {

if (subpartition instanceof PipelinedSubpartition) {
totalBuffers += ((PipelinedSubpartition) subpartition).getNumberOfQueuedBuffers();
}

if (subpartition instanceof SpillableSubpartition) {
totalBuffers += ((SpillableSubpartition) subpartition).getNumberOfQueuedBuffers();
}
totalBuffers += subpartition.getNumberOfQueuedBuffers();
}

return totalBuffers;
Expand Down
Expand Up @@ -83,4 +83,6 @@ protected Throwable getFailureCause() {

abstract public boolean isReleased();

abstract public int getNumberOfQueuedBuffers();

}
Expand Up @@ -228,9 +228,8 @@ public String toString() {
spillWriter != null);
}

@Override
public int getNumberOfQueuedBuffers() {
synchronized (buffers) {
return buffers.size();
}
}
}
Expand Up @@ -227,17 +227,28 @@ public int getPageSize() {
}

public int getNumberOfQueuedBuffers() {
int totalBuffers = 0;
int retry = 0;

for (Map.Entry<IntermediateResultPartitionID, InputChannel> entry: inputChannels.entrySet()) {
InputChannel channel = entry.getValue();
// re-try 3 times, if fails, return 0 for "unknown"
while(retry < 3) {
try {
int totalBuffers = 0;

if (channel instanceof RemoteInputChannel) {
totalBuffers += ((RemoteInputChannel) channel).getNumberOfQueuedBuffers();
for (Map.Entry<IntermediateResultPartitionID, InputChannel> entry : inputChannels.entrySet()) {
InputChannel channel = entry.getValue();

if (channel instanceof RemoteInputChannel) {
totalBuffers += ((RemoteInputChannel) channel).getNumberOfQueuedBuffers();
}
}

return totalBuffers;
} catch (Exception e) {
retry++;
}
}

return totalBuffers;
return 0;
}

// ------------------------------------------------------------------------
Expand Down
Expand Up @@ -19,7 +19,11 @@
package org.apache.flink.runtime.metrics.groups;

import org.apache.flink.metrics.Counter;
import org.apache.flink.metrics.Gauge;
import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.runtime.taskmanager.Task;
import org.apache.flink.runtime.io.network.partition.ResultPartition;
import org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate;

/**
* Metric group that contains shareable pre-defined IO-related metrics. The metrics registration is
Expand Down Expand Up @@ -58,4 +62,111 @@ public Counter getNumBytesInRemoteCounter() {
public MetricGroup getBuffersGroup() {
return buffers;
}

// ------------------------------------------------------------------------
// metrics of Buffers group
// ------------------------------------------------------------------------
/**
* Input received buffers gauge of a task
*/
public static class InputBuffersGauge implements Gauge<Integer> {

private final Task task;

public InputBuffersGauge(Task task) {
this.task = task;
}

@Override
public Integer getValue() {
int totalBuffers = 0;

for(SingleInputGate inputGate: task.getAllInputGates()) {
totalBuffers += inputGate.getNumberOfQueuedBuffers();
}

return totalBuffers;
}
}

/**
* Output produced buffers gauge of a task
*/
public static class OutputBuffersGauge implements Gauge<Integer> {

private final Task task;

public OutputBuffersGauge(Task task) {
this.task = task;
}

@Override
public Integer getValue() {
int totalBuffers = 0;

for(ResultPartition producedPartition: task.getProducedPartitions()) {
totalBuffers += producedPartition.getNumberOfQueuedBuffers();
}

return totalBuffers;
}
}

/**
* Input buffer pool usage gauge of a task
*/
public static class InputBufferPoolUsageGauge implements Gauge<Float> {

private final Task task;

public InputBufferPoolUsageGauge(Task task) {
this.task = task;
}

@Override
public Float getValue() {
int availableBuffers = 0;
int bufferPoolSize = 0;

for(SingleInputGate inputGate: task.getAllInputGates()) {
availableBuffers += inputGate.getBufferPool().getNumberOfAvailableMemorySegments();
bufferPoolSize += inputGate.getBufferPool().getNumBuffers();
}

if (bufferPoolSize != 0) {
return ((float)(bufferPoolSize - availableBuffers)) / bufferPoolSize;
} else {
return 0.0f;
}
}
}

/**
* Output buffer pool usage gauge of a task
*/
public static class OutputBufferPoolUsageGauge implements Gauge<Float> {

private final Task task;

public OutputBufferPoolUsageGauge(Task task) {
this.task = task;
}

@Override
public Float getValue() {
int availableBuffers = 0;
int bufferPoolSize = 0;

for(ResultPartition resultPartition: task.getProducedPartitions()) {
availableBuffers += resultPartition.getBufferPool().getNumberOfAvailableMemorySegments();
bufferPoolSize += resultPartition.getBufferPool().getNumBuffers();
}

if (bufferPoolSize != 0) {
return ((float)(bufferPoolSize - availableBuffers)) / bufferPoolSize;
} else {
return 0.0f;
}
}
}
}
Expand Up @@ -25,7 +25,6 @@
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.fs.Path;
import org.apache.flink.metrics.Gauge;
import org.apache.flink.runtime.accumulators.AccumulatorRegistry;
import org.apache.flink.runtime.blob.BlobKey;
import org.apache.flink.runtime.broadcast.BroadcastVariableManager;
Expand Down Expand Up @@ -57,6 +56,7 @@
import org.apache.flink.runtime.jobgraph.tasks.StatefulTask;
import org.apache.flink.runtime.jobgraph.tasks.StoppableTask;
import org.apache.flink.runtime.memory.MemoryManager;
import org.apache.flink.runtime.metrics.groups.TaskIOMetricGroup;
import org.apache.flink.runtime.metrics.groups.TaskMetricGroup;
import org.apache.flink.runtime.query.TaskKvStateRegistry;
import org.apache.flink.runtime.state.TaskStateHandles;
Expand Down Expand Up @@ -349,11 +349,11 @@ public Task(
// finally, create the executing thread, but do not start it
executingThread = new Thread(TASK_THREADS_GROUP, this, taskNameWithSubtask);

// add metric for buffers
this.metrics.getIOMetricGroup().getBuffersGroup().gauge("numIn", new InputBuffersGauge());
this.metrics.getIOMetricGroup().getBuffersGroup().gauge("numOut", new OutputBuffersGauge());
this.metrics.getIOMetricGroup().getBuffersGroup().gauge("InPoolUsage", new InputBufferPoolUsageGauge());
this.metrics.getIOMetricGroup().getBuffersGroup().gauge("OutPoolUsage", new OutputBufferPoolUsageGauge());
// add metrics for buffers
this.metrics.getIOMetricGroup().getBuffersGroup().gauge("inputQueueLength", new TaskIOMetricGroup.InputBuffersGauge(this));
this.metrics.getIOMetricGroup().getBuffersGroup().gauge("outputQueueLength", new TaskIOMetricGroup.OutputBuffersGauge(this));
this.metrics.getIOMetricGroup().getBuffersGroup().gauge("inPoolUsage", new TaskIOMetricGroup.InputBufferPoolUsageGauge(this));
this.metrics.getIOMetricGroup().getBuffersGroup().gauge("outPoolUsage", new TaskIOMetricGroup.OutputBufferPoolUsageGauge(this));
}

// ------------------------------------------------------------------------
Expand Down Expand Up @@ -1271,76 +1271,4 @@ public void run() {
}
}
}

// ------------------------------------------------------------------------
// metrics
// ------------------------------------------------------------------------

private class InputBuffersGauge implements Gauge<Integer> {

@Override
public Integer getValue() {
int totalBuffers = 0;

for(SingleInputGate inputGate: inputGates) {
totalBuffers += inputGate.getNumberOfQueuedBuffers();
}

return totalBuffers;
}
}

private class OutputBuffersGauge implements Gauge<Integer> {

@Override
public Integer getValue() {
int totalBuffers = 0;

for(ResultPartition producedPartition: producedPartitions) {
totalBuffers += producedPartition.getNumberOfQueuedBuffers();
}

return totalBuffers;
}
}

private class InputBufferPoolUsageGauge implements Gauge<Float> {

@Override
public Float getValue() {
int availableBuffers = 0;
int bufferPoolSize = 0;

for(SingleInputGate inputGate: inputGates) {
availableBuffers += inputGate.getBufferPool().getNumberOfAvailableMemorySegments();
bufferPoolSize += inputGate.getBufferPool().getNumBuffers();
}

if (bufferPoolSize != 0) {
return ((float)(bufferPoolSize - availableBuffers)) / bufferPoolSize;
} else {
return 0.0f;
}
}
}

private class OutputBufferPoolUsageGauge implements Gauge<Float> {

@Override
public Float getValue() {
int availableBuffers = 0;
int bufferPoolSize = 0;

for(ResultPartition resultPartition: producedPartitions){
availableBuffers += resultPartition.getBufferPool().getNumberOfAvailableMemorySegments();
bufferPoolSize += resultPartition.getBufferPool().getNumBuffers();
}

if (bufferPoolSize != 0) {
return ((float)(bufferPoolSize - availableBuffers)) / bufferPoolSize;
} else {
return 0.0f;
}
}
}
}

0 comments on commit 986d96f

Please sign in to comment.