Skip to content

Commit

Permalink
[FLINK-12201][network] Refactor the metric of numBytesIn out of Singl…
Browse files Browse the repository at this point in the history
…eInputGate

In order to further simplify the interface method of ShuffleService#createInputGates, we could refactor the numBytesIn counter
from SingleInputGate to StreamInputProcessor/StreamTwoInputProcessor. The general numBytesIn metric is from TaskIOMetric which
already exists in constructor of above two processors, so it is reasonable and simple to do so.
  • Loading branch information
taojiang.wzj@alibaba-inc.com authored and taojiang.wzj@alibaba-inc.com committed May 23, 2019
1 parent 7a3f081 commit f9232ca
Show file tree
Hide file tree
Showing 12 changed files with 38 additions and 44 deletions.
Expand Up @@ -20,7 +20,6 @@

import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.JobID;
import org.apache.flink.metrics.Counter;
import org.apache.flink.metrics.Gauge;
import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.runtime.deployment.InputGateDeploymentDescriptor;
Expand Down Expand Up @@ -233,8 +232,7 @@ public SingleInputGate[] createInputGates(
Collection<InputGateDeploymentDescriptor> inputGateDeploymentDescriptors,
MetricGroup parentGroup,
MetricGroup inputGroup,
MetricGroup buffersGroup,
Counter numBytesInCounter) {
MetricGroup buffersGroup) {
synchronized (lock) {
Preconditions.checkState(!isShutdown, "The NetworkEnvironment has already been shut down.");

Expand All @@ -247,8 +245,7 @@ public SingleInputGate[] createInputGates(
jobId,
igdd,
taskActions,
inputChannelMetrics,
numBytesInCounter);
inputChannelMetrics);
}

registerInputMetrics(inputGroup, buffersGroup, inputGates);
Expand Down
Expand Up @@ -18,6 +18,7 @@

package org.apache.flink.runtime.io.network.partition.consumer;

import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.runtime.event.AbstractEvent;
import org.apache.flink.runtime.io.network.buffer.Buffer;

Expand All @@ -43,26 +44,32 @@ public class BufferOrEvent {

private int channelIndex;

BufferOrEvent(Buffer buffer, int channelIndex, boolean moreAvailable) {
private final int size;

public BufferOrEvent(Buffer buffer, int channelIndex, boolean moreAvailable, int size) {
this.buffer = checkNotNull(buffer);
this.event = null;
this.channelIndex = channelIndex;
this.moreAvailable = moreAvailable;
this.size = size;
}

BufferOrEvent(AbstractEvent event, int channelIndex, boolean moreAvailable) {
public BufferOrEvent(AbstractEvent event, int channelIndex, boolean moreAvailable, int size) {
this.buffer = null;
this.event = checkNotNull(event);
this.channelIndex = channelIndex;
this.moreAvailable = moreAvailable;
this.size = size;
}

@VisibleForTesting
public BufferOrEvent(Buffer buffer, int channelIndex) {
this(buffer, channelIndex, true);
this(buffer, channelIndex, true, 0);
}

@VisibleForTesting
public BufferOrEvent(AbstractEvent event, int channelIndex) {
this(event, channelIndex, true);
this(event, channelIndex, true, 0);
}

public boolean isBuffer() {
Expand Down Expand Up @@ -96,11 +103,15 @@ boolean moreAvailable() {

@Override
public String toString() {
return String.format("BufferOrEvent [%s, channelIndex = %d]",
isBuffer() ? buffer : event, channelIndex);
return String.format("BufferOrEvent [%s, channelIndex = %d, size = %d%]",
isBuffer() ? buffer : event, channelIndex, size);
}

public void setMoreAvailable(boolean moreAvailable) {
this.moreAvailable = moreAvailable;
}

public int getSize() {
return size;
}
}
Expand Up @@ -20,7 +20,6 @@

import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.JobID;
import org.apache.flink.metrics.Counter;
import org.apache.flink.runtime.deployment.InputChannelDeploymentDescriptor;
import org.apache.flink.runtime.deployment.ResultPartitionLocation;
import org.apache.flink.runtime.event.AbstractEvent;
Expand Down Expand Up @@ -172,8 +171,6 @@ public class SingleInputGate extends InputGate {
/** A timer to retrigger local partition requests. Only initialized if actually needed. */
private Timer retriggerLocalRequestTimer;

private final Counter numBytesIn;

private final SupplierWithException<BufferPool, IOException> bufferPoolFactory;

public SingleInputGate(
Expand All @@ -184,7 +181,6 @@ public SingleInputGate(
int consumedSubpartitionIndex,
int numberOfInputChannels,
TaskActions taskActions,
Counter numBytesIn,
boolean isCreditBased,
SupplierWithException<BufferPool, IOException> bufferPoolFactory) {

Expand All @@ -207,8 +203,6 @@ public SingleInputGate(

this.taskActions = checkNotNull(taskActions);

this.numBytesIn = checkNotNull(numBytesIn);

this.isCreditBased = isCreditBased;
}

Expand Down Expand Up @@ -562,9 +556,9 @@ private BufferOrEvent transformToBufferOrEvent(
Buffer buffer,
boolean moreAvailable,
InputChannel currentChannel) throws IOException, InterruptedException {
numBytesIn.inc(buffer.getSizeUnsafe());
final int size = buffer.getSizeUnsafe();
if (buffer.isBuffer()) {
return new BufferOrEvent(buffer, currentChannel.getChannelIndex(), moreAvailable);
return new BufferOrEvent(buffer, currentChannel.getChannelIndex(), moreAvailable, size);
}
else {
final AbstractEvent event;
Expand Down Expand Up @@ -592,7 +586,7 @@ private BufferOrEvent transformToBufferOrEvent(
currentChannel.releaseAllResources();
}

return new BufferOrEvent(event, currentChannel.getChannelIndex(), moreAvailable);
return new BufferOrEvent(event, currentChannel.getChannelIndex(), moreAvailable, size);
}
}

Expand Down
Expand Up @@ -20,7 +20,6 @@

import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.JobID;
import org.apache.flink.metrics.Counter;
import org.apache.flink.runtime.deployment.InputChannelDeploymentDescriptor;
import org.apache.flink.runtime.deployment.InputGateDeploymentDescriptor;
import org.apache.flink.runtime.deployment.ResultPartitionLocation;
Expand Down Expand Up @@ -101,8 +100,7 @@ public SingleInputGate create(
@Nonnull JobID jobId,
@Nonnull InputGateDeploymentDescriptor igdd,
@Nonnull TaskActions taskActions,
@Nonnull InputChannelMetrics metrics,
@Nonnull Counter numBytesInCounter) {
@Nonnull InputChannelMetrics metrics) {
final IntermediateDataSetID consumedResultId = checkNotNull(igdd.getConsumedResultId());
final ResultPartitionType consumedPartitionType = checkNotNull(igdd.getConsumedPartitionType());

Expand All @@ -113,8 +111,7 @@ public SingleInputGate create(

final SingleInputGate inputGate = new SingleInputGate(
owningTaskName, jobId, consumedResultId, consumedPartitionType, consumedSubpartitionIndex,
icdd.length, taskActions, numBytesInCounter, isCreditBased,
createBufferPoolFactory(icdd.length, consumedPartitionType));
icdd.length, taskActions, isCreditBased, createBufferPoolFactory(icdd.length, consumedPartitionType));

// Create the input channels. There is one input channel for each consumed partition.
final InputChannel[] inputChannels = new InputChannel[icdd.length];
Expand Down
Expand Up @@ -389,8 +389,7 @@ public Task(
inputGateDeploymentDescriptors,
metrics.getIOMetricGroup(),
inputGroup,
buffersGroup,
metrics.getIOMetricGroup().getNumBytesInCounter());
buffersGroup);

this.inputGatesById = new HashMap<>();
for (SingleInputGate inputGate : inputGates) {
Expand Down
Expand Up @@ -19,7 +19,6 @@
package org.apache.flink.runtime.io.network.partition;

import org.apache.flink.api.common.JobID;
import org.apache.flink.metrics.SimpleCounter;
import org.apache.flink.runtime.io.network.ConnectionManager;
import org.apache.flink.runtime.io.network.api.EndOfPartitionEvent;
import org.apache.flink.runtime.io.network.api.serialization.EventSerializer;
Expand Down Expand Up @@ -332,8 +331,7 @@ public FairnessVerifyingInputGate(
boolean isCreditBased) {

super(owningTaskName, jobId, consumedResultId, ResultPartitionType.PIPELINED,
consumedSubpartitionIndex, numberOfInputChannels, taskActions, new SimpleCounter(),
isCreditBased, STUB_BUFFER_POOL_FACTORY);
consumedSubpartitionIndex, numberOfInputChannels, taskActions, isCreditBased, STUB_BUFFER_POOL_FACTORY);

try {
Field f = SingleInputGate.class.getDeclaredField("inputChannelsWithData");
Expand Down
Expand Up @@ -19,8 +19,6 @@
package org.apache.flink.runtime.io.network.partition.consumer;

import org.apache.flink.api.common.JobID;
import org.apache.flink.metrics.Counter;
import org.apache.flink.metrics.SimpleCounter;
import org.apache.flink.runtime.io.network.NetworkEnvironment;
import org.apache.flink.runtime.io.network.buffer.BufferPool;
import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
Expand Down Expand Up @@ -49,8 +47,6 @@ public class SingleInputGateBuilder {

private final TaskActions taskActions = new NoOpTaskActions();

private final Counter numBytesInCounter = new SimpleCounter();

private boolean isCreditBased = true;

private SupplierWithException<BufferPool, IOException> bufferPoolFactory = () -> {
Expand Down Expand Up @@ -98,7 +94,6 @@ public SingleInputGate build() {
consumedSubpartitionIndex,
numberOfChannels,
taskActions,
numBytesInCounter,
isCreditBased,
bufferPoolFactory);
}
Expand Down
Expand Up @@ -20,7 +20,6 @@

import org.apache.flink.api.common.JobID;
import org.apache.flink.core.memory.MemorySegmentFactory;
import org.apache.flink.metrics.SimpleCounter;
import org.apache.flink.runtime.deployment.InputChannelDeploymentDescriptor;
import org.apache.flink.runtime.deployment.InputGateDeploymentDescriptor;
import org.apache.flink.runtime.deployment.ResultPartitionLocation;
Expand Down Expand Up @@ -343,8 +342,7 @@ public void testRequestBackoffConfiguration() throws Exception {
new JobID(),
gateDesc,
new NoOpTaskActions(),
InputChannelTestUtils.newUnregisteredInputChannelMetrics(),
new SimpleCounter());
InputChannelTestUtils.newUnregisteredInputChannelMetrics());

try {
assertEquals(gateDesc.getConsumedPartitionType(), gate.getConsumedPartitionType());
Expand Down
Expand Up @@ -374,7 +374,7 @@ public BufferOrEvent getNext() throws IOException {
Buffer buf = new NetworkBuffer(seg, FreeingBufferRecycler.INSTANCE);
buf.setSize(length);

return new BufferOrEvent(buf, channel);
return new BufferOrEvent(buf, channel, true, length);
}
else {
// deserialize event
Expand All @@ -399,7 +399,7 @@ public BufferOrEvent getNext() throws IOException {
AbstractEvent evt = EventSerializer.fromSerializedEvent(buffer, getClass().getClassLoader());
buffer.limit(oldLimit);

return new BufferOrEvent(evt, channel);
return new BufferOrEvent(evt, channel, true, length);
}
}

Expand Down
Expand Up @@ -104,6 +104,7 @@ public class StreamInputProcessor<IN> {

// ---------------- Metrics ------------------

private final Counter numBytesIn;
private final WatermarkGauge watermarkGauge;
private Counter numRecordsIn;

Expand Down Expand Up @@ -152,6 +153,8 @@ public StreamInputProcessor(

this.watermarkGauge = watermarkGauge;
metrics.gauge("checkpointAlignmentTime", barrierHandler::getAlignmentDurationNanos);

this.numBytesIn = metrics.getNumBytesInCounter();
}

public boolean processInput() throws Exception {
Expand Down Expand Up @@ -212,6 +215,7 @@ public boolean processInput() throws Exception {
currentChannel = bufferOrEvent.getChannelIndex();
currentRecordDeserializer = recordDeserializers[currentChannel];
currentRecordDeserializer.setNextBuffer(bufferOrEvent.getBuffer());
numBytesIn.inc(bufferOrEvent.getSize());
}
else {
// Event received
Expand Down
Expand Up @@ -121,6 +121,7 @@ public class StreamTwoInputProcessor<IN1, IN2> {
private final WatermarkGauge input1WatermarkGauge;
private final WatermarkGauge input2WatermarkGauge;

private final Counter numBytesIn;
private Counter numRecordsIn;

private boolean isFinished;
Expand Down Expand Up @@ -184,6 +185,8 @@ public StreamTwoInputProcessor(
this.input1WatermarkGauge = input1WatermarkGauge;
this.input2WatermarkGauge = input2WatermarkGauge;
metrics.gauge("checkpointAlignmentTime", barrierHandler::getAlignmentDurationNanos);

this.numBytesIn = metrics.getNumBytesInCounter();
}

public boolean processInput() throws Exception {
Expand Down Expand Up @@ -277,7 +280,7 @@ else if (recordOrWatermark.isLatencyMarker()) {
currentChannel = bufferOrEvent.getChannelIndex();
currentRecordDeserializer = recordDeserializers[currentChannel];
currentRecordDeserializer.setNextBuffer(bufferOrEvent.getBuffer());

numBytesIn.inc(bufferOrEvent.getSize());
} else {
// Event received
final AbstractEvent event = bufferOrEvent.getEvent();
Expand Down
Expand Up @@ -21,7 +21,6 @@
import org.apache.flink.api.common.JobID;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.io.IOReadableWritable;
import org.apache.flink.metrics.SimpleCounter;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.deployment.InputChannelDeploymentDescriptor;
import org.apache.flink.runtime.deployment.InputGateDeploymentDescriptor;
Expand Down Expand Up @@ -261,8 +260,7 @@ private InputGate createInputGate(
jobId,
gateDescriptor,
new NoOpTaskActions(),
InputChannelTestUtils.newUnregisteredInputChannelMetrics(),
new SimpleCounter());
InputChannelTestUtils.newUnregisteredInputChannelMetrics());

gate.setup();
gates[channel] = gate;
Expand Down

0 comments on commit f9232ca

Please sign in to comment.