From f9232ca1f1f9c6fdb041f772afdd5a8a3190a86d Mon Sep 17 00:00:00 2001 From: "taojiang.wzj@alibaba-inc.com" Date: Thu, 23 May 2019 17:21:23 +0800 Subject: [PATCH] [FLINK-12201][network] Refactor the metric of numBytesIn out of SingleInputGate In order to further simplify the interface method of ShuffleService#createInputGates, we could refactor the numBytesIn counter from SingleInputGate to StreamInputProcessor/StreamTwoInputProcessor. The general numBytesIn metric is from TaskIOMetric which already exists in constructor of above two processors, so it is reasonable and simple to do so. --- .../io/network/NetworkEnvironment.java | 7 ++---- .../partition/consumer/BufferOrEvent.java | 23 ++++++++++++++----- .../partition/consumer/SingleInputGate.java | 12 +++------- .../consumer/SingleInputGateFactory.java | 7 ++---- .../flink/runtime/taskmanager/Task.java | 3 +-- .../partition/InputGateFairnessTest.java | 4 +--- .../consumer/SingleInputGateBuilder.java | 5 ---- .../consumer/SingleInputGateTest.java | 4 +--- .../streaming/runtime/io/BufferSpiller.java | 4 ++-- .../runtime/io/StreamInputProcessor.java | 4 ++++ .../runtime/io/StreamTwoInputProcessor.java | 5 +++- .../StreamNetworkBenchmarkEnvironment.java | 4 +--- 12 files changed, 38 insertions(+), 44 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java index 1f2ee7ea047fc9..0bf9b0fa5d2bf8 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java @@ -20,7 +20,6 @@ import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.api.common.JobID; -import org.apache.flink.metrics.Counter; import org.apache.flink.metrics.Gauge; import org.apache.flink.metrics.MetricGroup; import org.apache.flink.runtime.deployment.InputGateDeploymentDescriptor; @@ -233,8 +232,7 @@ public SingleInputGate[] createInputGates( Collection inputGateDeploymentDescriptors, MetricGroup parentGroup, MetricGroup inputGroup, - MetricGroup buffersGroup, - Counter numBytesInCounter) { + MetricGroup buffersGroup) { synchronized (lock) { Preconditions.checkState(!isShutdown, "The NetworkEnvironment has already been shut down."); @@ -247,8 +245,7 @@ public SingleInputGate[] createInputGates( jobId, igdd, taskActions, - inputChannelMetrics, - numBytesInCounter); + inputChannelMetrics); } registerInputMetrics(inputGroup, buffersGroup, inputGates); diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/BufferOrEvent.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/BufferOrEvent.java index d1da4388c1b2be..75c366ddd9c93b 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/BufferOrEvent.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/BufferOrEvent.java @@ -18,6 +18,7 @@ package org.apache.flink.runtime.io.network.partition.consumer; +import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.runtime.event.AbstractEvent; import org.apache.flink.runtime.io.network.buffer.Buffer; @@ -43,26 +44,32 @@ public class BufferOrEvent { private int channelIndex; - BufferOrEvent(Buffer buffer, int channelIndex, boolean moreAvailable) { + private final int size; + + public BufferOrEvent(Buffer buffer, int channelIndex, boolean moreAvailable, int size) { this.buffer = checkNotNull(buffer); this.event = null; this.channelIndex = channelIndex; this.moreAvailable = moreAvailable; + this.size = size; } - BufferOrEvent(AbstractEvent event, int channelIndex, boolean moreAvailable) { + public BufferOrEvent(AbstractEvent event, int channelIndex, boolean moreAvailable, int size) { this.buffer = null; this.event = checkNotNull(event); this.channelIndex = channelIndex; this.moreAvailable = moreAvailable; + this.size = size; } + @VisibleForTesting public BufferOrEvent(Buffer buffer, int channelIndex) { - this(buffer, channelIndex, true); + this(buffer, channelIndex, true, 0); } + @VisibleForTesting public BufferOrEvent(AbstractEvent event, int channelIndex) { - this(event, channelIndex, true); + this(event, channelIndex, true, 0); } public boolean isBuffer() { @@ -96,11 +103,15 @@ boolean moreAvailable() { @Override public String toString() { - return String.format("BufferOrEvent [%s, channelIndex = %d]", - isBuffer() ? buffer : event, channelIndex); + return String.format("BufferOrEvent [%s, channelIndex = %d, size = %d%]", + isBuffer() ? buffer : event, channelIndex, size); } public void setMoreAvailable(boolean moreAvailable) { this.moreAvailable = moreAvailable; } + + public int getSize() { + return size; + } } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java index 6c23698f041699..efc6ebae5f3ec9 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java @@ -20,7 +20,6 @@ import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.api.common.JobID; -import org.apache.flink.metrics.Counter; import org.apache.flink.runtime.deployment.InputChannelDeploymentDescriptor; import org.apache.flink.runtime.deployment.ResultPartitionLocation; import org.apache.flink.runtime.event.AbstractEvent; @@ -172,8 +171,6 @@ public class SingleInputGate extends InputGate { /** A timer to retrigger local partition requests. Only initialized if actually needed. */ private Timer retriggerLocalRequestTimer; - private final Counter numBytesIn; - private final SupplierWithException bufferPoolFactory; public SingleInputGate( @@ -184,7 +181,6 @@ public SingleInputGate( int consumedSubpartitionIndex, int numberOfInputChannels, TaskActions taskActions, - Counter numBytesIn, boolean isCreditBased, SupplierWithException bufferPoolFactory) { @@ -207,8 +203,6 @@ public SingleInputGate( this.taskActions = checkNotNull(taskActions); - this.numBytesIn = checkNotNull(numBytesIn); - this.isCreditBased = isCreditBased; } @@ -562,9 +556,9 @@ private BufferOrEvent transformToBufferOrEvent( Buffer buffer, boolean moreAvailable, InputChannel currentChannel) throws IOException, InterruptedException { - numBytesIn.inc(buffer.getSizeUnsafe()); + final int size = buffer.getSizeUnsafe(); if (buffer.isBuffer()) { - return new BufferOrEvent(buffer, currentChannel.getChannelIndex(), moreAvailable); + return new BufferOrEvent(buffer, currentChannel.getChannelIndex(), moreAvailable, size); } else { final AbstractEvent event; @@ -592,7 +586,7 @@ private BufferOrEvent transformToBufferOrEvent( currentChannel.releaseAllResources(); } - return new BufferOrEvent(event, currentChannel.getChannelIndex(), moreAvailable); + return new BufferOrEvent(event, currentChannel.getChannelIndex(), moreAvailable, size); } } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateFactory.java index cf2820dd34fcc7..2b30bec056c9f8 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateFactory.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateFactory.java @@ -20,7 +20,6 @@ import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.api.common.JobID; -import org.apache.flink.metrics.Counter; import org.apache.flink.runtime.deployment.InputChannelDeploymentDescriptor; import org.apache.flink.runtime.deployment.InputGateDeploymentDescriptor; import org.apache.flink.runtime.deployment.ResultPartitionLocation; @@ -101,8 +100,7 @@ public SingleInputGate create( @Nonnull JobID jobId, @Nonnull InputGateDeploymentDescriptor igdd, @Nonnull TaskActions taskActions, - @Nonnull InputChannelMetrics metrics, - @Nonnull Counter numBytesInCounter) { + @Nonnull InputChannelMetrics metrics) { final IntermediateDataSetID consumedResultId = checkNotNull(igdd.getConsumedResultId()); final ResultPartitionType consumedPartitionType = checkNotNull(igdd.getConsumedPartitionType()); @@ -113,8 +111,7 @@ public SingleInputGate create( final SingleInputGate inputGate = new SingleInputGate( owningTaskName, jobId, consumedResultId, consumedPartitionType, consumedSubpartitionIndex, - icdd.length, taskActions, numBytesInCounter, isCreditBased, - createBufferPoolFactory(icdd.length, consumedPartitionType)); + icdd.length, taskActions, isCreditBased, createBufferPoolFactory(icdd.length, consumedPartitionType)); // Create the input channels. There is one input channel for each consumed partition. final InputChannel[] inputChannels = new InputChannel[icdd.length]; diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java index 1d95231c96a2f6..c994260acba173 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java @@ -389,8 +389,7 @@ public Task( inputGateDeploymentDescriptors, metrics.getIOMetricGroup(), inputGroup, - buffersGroup, - metrics.getIOMetricGroup().getNumBytesInCounter()); + buffersGroup); this.inputGatesById = new HashMap<>(); for (SingleInputGate inputGate : inputGates) { diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/InputGateFairnessTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/InputGateFairnessTest.java index 14ebabcf154a52..a3cdc8390cee7e 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/InputGateFairnessTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/InputGateFairnessTest.java @@ -19,7 +19,6 @@ package org.apache.flink.runtime.io.network.partition; import org.apache.flink.api.common.JobID; -import org.apache.flink.metrics.SimpleCounter; import org.apache.flink.runtime.io.network.ConnectionManager; import org.apache.flink.runtime.io.network.api.EndOfPartitionEvent; import org.apache.flink.runtime.io.network.api.serialization.EventSerializer; @@ -332,8 +331,7 @@ public FairnessVerifyingInputGate( boolean isCreditBased) { super(owningTaskName, jobId, consumedResultId, ResultPartitionType.PIPELINED, - consumedSubpartitionIndex, numberOfInputChannels, taskActions, new SimpleCounter(), - isCreditBased, STUB_BUFFER_POOL_FACTORY); + consumedSubpartitionIndex, numberOfInputChannels, taskActions, isCreditBased, STUB_BUFFER_POOL_FACTORY); try { Field f = SingleInputGate.class.getDeclaredField("inputChannelsWithData"); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateBuilder.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateBuilder.java index 6f27baf7a0579a..496fad5258fd84 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateBuilder.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateBuilder.java @@ -19,8 +19,6 @@ package org.apache.flink.runtime.io.network.partition.consumer; import org.apache.flink.api.common.JobID; -import org.apache.flink.metrics.Counter; -import org.apache.flink.metrics.SimpleCounter; import org.apache.flink.runtime.io.network.NetworkEnvironment; import org.apache.flink.runtime.io.network.buffer.BufferPool; import org.apache.flink.runtime.io.network.partition.ResultPartitionType; @@ -49,8 +47,6 @@ public class SingleInputGateBuilder { private final TaskActions taskActions = new NoOpTaskActions(); - private final Counter numBytesInCounter = new SimpleCounter(); - private boolean isCreditBased = true; private SupplierWithException bufferPoolFactory = () -> { @@ -98,7 +94,6 @@ public SingleInputGate build() { consumedSubpartitionIndex, numberOfChannels, taskActions, - numBytesInCounter, isCreditBased, bufferPoolFactory); } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java index 93c337500cddbd..5da8b390d92c50 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java @@ -20,7 +20,6 @@ import org.apache.flink.api.common.JobID; import org.apache.flink.core.memory.MemorySegmentFactory; -import org.apache.flink.metrics.SimpleCounter; import org.apache.flink.runtime.deployment.InputChannelDeploymentDescriptor; import org.apache.flink.runtime.deployment.InputGateDeploymentDescriptor; import org.apache.flink.runtime.deployment.ResultPartitionLocation; @@ -343,8 +342,7 @@ public void testRequestBackoffConfiguration() throws Exception { new JobID(), gateDesc, new NoOpTaskActions(), - InputChannelTestUtils.newUnregisteredInputChannelMetrics(), - new SimpleCounter()); + InputChannelTestUtils.newUnregisteredInputChannelMetrics()); try { assertEquals(gateDesc.getConsumedPartitionType(), gate.getConsumedPartitionType()); diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BufferSpiller.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BufferSpiller.java index 700043092dfc42..8350e254be74c5 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BufferSpiller.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BufferSpiller.java @@ -374,7 +374,7 @@ public BufferOrEvent getNext() throws IOException { Buffer buf = new NetworkBuffer(seg, FreeingBufferRecycler.INSTANCE); buf.setSize(length); - return new BufferOrEvent(buf, channel); + return new BufferOrEvent(buf, channel, true, length); } else { // deserialize event @@ -399,7 +399,7 @@ public BufferOrEvent getNext() throws IOException { AbstractEvent evt = EventSerializer.fromSerializedEvent(buffer, getClass().getClassLoader()); buffer.limit(oldLimit); - return new BufferOrEvent(evt, channel); + return new BufferOrEvent(evt, channel, true, length); } } diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamInputProcessor.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamInputProcessor.java index a9c64b5f6fe9ef..727d7896fd5091 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamInputProcessor.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamInputProcessor.java @@ -104,6 +104,7 @@ public class StreamInputProcessor { // ---------------- Metrics ------------------ + private final Counter numBytesIn; private final WatermarkGauge watermarkGauge; private Counter numRecordsIn; @@ -152,6 +153,8 @@ public StreamInputProcessor( this.watermarkGauge = watermarkGauge; metrics.gauge("checkpointAlignmentTime", barrierHandler::getAlignmentDurationNanos); + + this.numBytesIn = metrics.getNumBytesInCounter(); } public boolean processInput() throws Exception { @@ -212,6 +215,7 @@ public boolean processInput() throws Exception { currentChannel = bufferOrEvent.getChannelIndex(); currentRecordDeserializer = recordDeserializers[currentChannel]; currentRecordDeserializer.setNextBuffer(bufferOrEvent.getBuffer()); + numBytesIn.inc(bufferOrEvent.getSize()); } else { // Event received diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputProcessor.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputProcessor.java index ab4f90dcf23f06..964d3f22fc9141 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputProcessor.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputProcessor.java @@ -121,6 +121,7 @@ public class StreamTwoInputProcessor { private final WatermarkGauge input1WatermarkGauge; private final WatermarkGauge input2WatermarkGauge; + private final Counter numBytesIn; private Counter numRecordsIn; private boolean isFinished; @@ -184,6 +185,8 @@ public StreamTwoInputProcessor( this.input1WatermarkGauge = input1WatermarkGauge; this.input2WatermarkGauge = input2WatermarkGauge; metrics.gauge("checkpointAlignmentTime", barrierHandler::getAlignmentDurationNanos); + + this.numBytesIn = metrics.getNumBytesInCounter(); } public boolean processInput() throws Exception { @@ -277,7 +280,7 @@ else if (recordOrWatermark.isLatencyMarker()) { currentChannel = bufferOrEvent.getChannelIndex(); currentRecordDeserializer = recordDeserializers[currentChannel]; currentRecordDeserializer.setNextBuffer(bufferOrEvent.getBuffer()); - + numBytesIn.inc(bufferOrEvent.getSize()); } else { // Event received final AbstractEvent event = bufferOrEvent.getEvent(); diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/StreamNetworkBenchmarkEnvironment.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/StreamNetworkBenchmarkEnvironment.java index af5922096139f9..f17ba7687abb83 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/StreamNetworkBenchmarkEnvironment.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/StreamNetworkBenchmarkEnvironment.java @@ -21,7 +21,6 @@ import org.apache.flink.api.common.JobID; import org.apache.flink.configuration.Configuration; import org.apache.flink.core.io.IOReadableWritable; -import org.apache.flink.metrics.SimpleCounter; import org.apache.flink.runtime.clusterframework.types.ResourceID; import org.apache.flink.runtime.deployment.InputChannelDeploymentDescriptor; import org.apache.flink.runtime.deployment.InputGateDeploymentDescriptor; @@ -261,8 +260,7 @@ private InputGate createInputGate( jobId, gateDescriptor, new NoOpTaskActions(), - InputChannelTestUtils.newUnregisteredInputChannelMetrics(), - new SimpleCounter()); + InputChannelTestUtils.newUnregisteredInputChannelMetrics()); gate.setup(); gates[channel] = gate;