diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java index 43969e2c14369c..6e5eb2a84a6d35 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java @@ -20,7 +20,6 @@ import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.api.common.JobID; -import org.apache.flink.metrics.Counter; import org.apache.flink.metrics.Gauge; import org.apache.flink.metrics.MetricGroup; import org.apache.flink.runtime.deployment.InputGateDeploymentDescriptor; @@ -252,8 +251,7 @@ public SingleInputGate[] createInputGates( Collection inputGateDeploymentDescriptors, MetricGroup parentGroup, MetricGroup inputGroup, - MetricGroup buffersGroup, - Counter numBytesInCounter) { + MetricGroup buffersGroup) { synchronized (lock) { Preconditions.checkState(!isShutdown, "The NetworkEnvironment has already been shut down."); @@ -265,8 +263,7 @@ public SingleInputGate[] createInputGates( taskName, igdd, partitionProducerStateProvider, - inputChannelMetrics, - numBytesInCounter); + inputChannelMetrics); InputGateID id = new InputGateID(igdd.getConsumedResultId(), executionId); inputGatesById.put(id, inputGate); inputGate.getCloseFuture().thenRun(() -> inputGatesById.remove(id)); diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/BufferOrEvent.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/BufferOrEvent.java index d1da4388c1b2be..4c6a2238a257d9 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/BufferOrEvent.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/BufferOrEvent.java @@ -18,6 +18,7 @@ package org.apache.flink.runtime.io.network.partition.consumer; +import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.runtime.event.AbstractEvent; import org.apache.flink.runtime.io.network.buffer.Buffer; @@ -43,26 +44,32 @@ public class BufferOrEvent { private int channelIndex; - BufferOrEvent(Buffer buffer, int channelIndex, boolean moreAvailable) { + private final int size; + + public BufferOrEvent(Buffer buffer, int channelIndex, boolean moreAvailable, int size) { this.buffer = checkNotNull(buffer); this.event = null; this.channelIndex = channelIndex; this.moreAvailable = moreAvailable; + this.size = size; } - BufferOrEvent(AbstractEvent event, int channelIndex, boolean moreAvailable) { + public BufferOrEvent(AbstractEvent event, int channelIndex, boolean moreAvailable, int size) { this.buffer = null; this.event = checkNotNull(event); this.channelIndex = channelIndex; this.moreAvailable = moreAvailable; + this.size = size; } + @VisibleForTesting public BufferOrEvent(Buffer buffer, int channelIndex) { - this(buffer, channelIndex, true); + this(buffer, channelIndex, true, 0); } + @VisibleForTesting public BufferOrEvent(AbstractEvent event, int channelIndex) { - this(event, channelIndex, true); + this(event, channelIndex, true, 0); } public boolean isBuffer() { @@ -96,11 +103,15 @@ boolean moreAvailable() { @Override public String toString() { - return String.format("BufferOrEvent [%s, channelIndex = %d]", - isBuffer() ? buffer : event, channelIndex); + return String.format("BufferOrEvent [%s, channelIndex = %d, size = %d]", + isBuffer() ? buffer : event, channelIndex, size); } public void setMoreAvailable(boolean moreAvailable) { this.moreAvailable = moreAvailable; } + + public int getSize() { + return size; + } } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java index 5e5a722ffd51f2..816866b2853920 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java @@ -19,7 +19,6 @@ package org.apache.flink.runtime.io.network.partition.consumer; import org.apache.flink.annotation.VisibleForTesting; -import org.apache.flink.metrics.Counter; import org.apache.flink.runtime.deployment.InputChannelDeploymentDescriptor; import org.apache.flink.runtime.deployment.ResultPartitionLocation; import org.apache.flink.runtime.event.AbstractEvent; @@ -165,8 +164,6 @@ public class SingleInputGate extends InputGate { /** A timer to retrigger local partition requests. Only initialized if actually needed. */ private Timer retriggerLocalRequestTimer; - private final Counter numBytesIn; - private final SupplierWithException bufferPoolFactory; private final CompletableFuture closeFuture; @@ -178,7 +175,6 @@ public SingleInputGate( int consumedSubpartitionIndex, int numberOfInputChannels, PartitionProducerStateProvider partitionProducerStateProvider, - Counter numBytesIn, boolean isCreditBased, SupplierWithException bufferPoolFactory) { @@ -200,8 +196,6 @@ public SingleInputGate( this.partitionProducerStateProvider = checkNotNull(partitionProducerStateProvider); - this.numBytesIn = checkNotNull(numBytesIn); - this.isCreditBased = isCreditBased; this.closeFuture = new CompletableFuture<>(); @@ -566,9 +560,9 @@ private BufferOrEvent transformToBufferOrEvent( Buffer buffer, boolean moreAvailable, InputChannel currentChannel) throws IOException, InterruptedException { - numBytesIn.inc(buffer.getSizeUnsafe()); + final int size = buffer.getSizeUnsafe(); if (buffer.isBuffer()) { - return new BufferOrEvent(buffer, currentChannel.getChannelIndex(), moreAvailable); + return new BufferOrEvent(buffer, currentChannel.getChannelIndex(), moreAvailable, size); } else { final AbstractEvent event; @@ -596,7 +590,7 @@ private BufferOrEvent transformToBufferOrEvent( currentChannel.releaseAllResources(); } - return new BufferOrEvent(event, currentChannel.getChannelIndex(), moreAvailable); + return new BufferOrEvent(event, currentChannel.getChannelIndex(), moreAvailable, size); } } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateFactory.java index fcc36659edd78f..6caf0174fac706 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateFactory.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateFactory.java @@ -19,7 +19,6 @@ package org.apache.flink.runtime.io.network.partition.consumer; import org.apache.flink.annotation.VisibleForTesting; -import org.apache.flink.metrics.Counter; import org.apache.flink.runtime.deployment.InputChannelDeploymentDescriptor; import org.apache.flink.runtime.deployment.InputGateDeploymentDescriptor; import org.apache.flink.runtime.deployment.ResultPartitionLocation; @@ -99,8 +98,7 @@ public SingleInputGate create( @Nonnull String owningTaskName, @Nonnull InputGateDeploymentDescriptor igdd, @Nonnull PartitionProducerStateProvider partitionProducerStateProvider, - @Nonnull InputChannelMetrics metrics, - @Nonnull Counter numBytesInCounter) { + @Nonnull InputChannelMetrics metrics) { final IntermediateDataSetID consumedResultId = checkNotNull(igdd.getConsumedResultId()); final ResultPartitionType consumedPartitionType = checkNotNull(igdd.getConsumedPartitionType()); @@ -116,7 +114,6 @@ public SingleInputGate create( consumedSubpartitionIndex, icdd.length, partitionProducerStateProvider, - numBytesInCounter, isCreditBased, createBufferPoolFactory(icdd.length, consumedPartitionType)); diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/InputGateWithMetrics.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/InputGateWithMetrics.java new file mode 100644 index 00000000000000..40694f3fb5729f --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/InputGateWithMetrics.java @@ -0,0 +1,106 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.taskmanager; + +import org.apache.flink.metrics.Counter; +import org.apache.flink.runtime.event.TaskEvent; +import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent; +import org.apache.flink.runtime.io.network.partition.consumer.InputGate; +import org.apache.flink.runtime.metrics.groups.TaskIOMetricGroup; + +import java.io.IOException; +import java.util.Optional; +import java.util.concurrent.CompletableFuture; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * This class wraps {@link InputGate} provided by shuffle service and it is mainly + * used for increasing general input metrics from {@link TaskIOMetricGroup}. + */ +public class InputGateWithMetrics extends InputGate { + + private final InputGate inputGate; + + private final Counter numBytesIn; + + public InputGateWithMetrics(InputGate inputGate, Counter numBytesIn) { + this.inputGate = checkNotNull(inputGate); + this.numBytesIn = checkNotNull(numBytesIn); + } + + @Override + public CompletableFuture isAvailable() { + return inputGate.isAvailable(); + } + + @Override + public int getNumberOfInputChannels() { + return inputGate.getNumberOfInputChannels(); + } + + @Override + public String getOwningTaskName() { + return inputGate.getOwningTaskName(); + } + + @Override + public boolean isFinished() { + return inputGate.isFinished(); + } + + @Override + public void setup() throws IOException { + inputGate.setup(); + } + + @Override + public void requestPartitions() throws IOException, InterruptedException { + inputGate.requestPartitions(); + } + + @Override + public Optional getNextBufferOrEvent() throws IOException, InterruptedException { + Optional bufferOrEvent = inputGate.getNextBufferOrEvent(); + bufferOrEvent.ifPresent(b -> numBytesIn.inc(b.getSize())); + return bufferOrEvent; + } + + @Override + public Optional pollNextBufferOrEvent() throws IOException, InterruptedException { + Optional bufferOrEvent = inputGate.pollNextBufferOrEvent(); + bufferOrEvent.ifPresent(b -> numBytesIn.inc(b.getSize())); + return bufferOrEvent; + } + + @Override + public void sendTaskEvent(TaskEvent event) throws IOException { + inputGate.sendTaskEvent(event); + } + + @Override + public int getPageSize() { + return inputGate.getPageSize(); + } + + @Override + public void close() throws Exception { + inputGate.close(); + } +} diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java index 57d8c6bdc20a32..0d124d1f4d45dd 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java @@ -192,7 +192,7 @@ public class Task implements Runnable, TaskActions, PartitionProducerStateProvid private final ResultPartitionWriter[] producedPartitions; - private final SingleInputGate[] inputGates; + private final InputGate[] inputGates; /** Connection to the task manager. */ private final TaskManagerActions taskManagerActions; @@ -380,15 +380,20 @@ public Task( buffersGroup); // consumed intermediate result partitions - this.inputGates = networkEnvironment.createInputGates( + InputGate[] gates = networkEnvironment.createInputGates( taskNameWithSubtaskAndId, executionId, this, inputGateDeploymentDescriptors, metrics.getIOMetricGroup(), inputGroup, - buffersGroup, - metrics.getIOMetricGroup().getNumBytesInCounter()); + buffersGroup); + + this.inputGates = new InputGate[gates.length]; + int counter = 0; + for (InputGate gate : gates) { + inputGates[counter++] = new InputGateWithMetrics(gate, metrics.getIOMetricGroup().getNumBytesInCounter()); + } invokableHasBeenCanceled = new AtomicBoolean(false); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/InputGateFairnessTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/InputGateFairnessTest.java index d670d01743e09f..10221c5e86baf8 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/InputGateFairnessTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/InputGateFairnessTest.java @@ -18,7 +18,6 @@ package org.apache.flink.runtime.io.network.partition; -import org.apache.flink.metrics.SimpleCounter; import org.apache.flink.runtime.io.network.ConnectionManager; import org.apache.flink.runtime.io.network.api.EndOfPartitionEvent; import org.apache.flink.runtime.io.network.api.serialization.EventSerializer; @@ -331,7 +330,6 @@ public FairnessVerifyingInputGate( consumedSubpartitionIndex, numberOfInputChannels, SingleInputGateBuilder.NO_OP_PRODUCER_CHECKER, - new SimpleCounter(), isCreditBased, STUB_BUFFER_POOL_FACTORY); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateBuilder.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateBuilder.java index 51eba307403441..8c04d5f60b85dc 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateBuilder.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateBuilder.java @@ -18,8 +18,6 @@ package org.apache.flink.runtime.io.network.partition.consumer; -import org.apache.flink.metrics.Counter; -import org.apache.flink.metrics.SimpleCounter; import org.apache.flink.runtime.io.network.NetworkEnvironment; import org.apache.flink.runtime.io.network.buffer.BufferPool; import org.apache.flink.runtime.io.network.partition.PartitionProducerStateProvider; @@ -51,8 +49,6 @@ public class SingleInputGateBuilder { private final PartitionProducerStateProvider partitionProducerStateProvider = NO_OP_PRODUCER_CHECKER; - private final Counter numBytesInCounter = new SimpleCounter(); - private boolean isCreditBased = true; private SupplierWithException bufferPoolFactory = () -> { @@ -99,7 +95,6 @@ public SingleInputGate build() { consumedSubpartitionIndex, numberOfChannels, partitionProducerStateProvider, - numBytesInCounter, isCreditBased, bufferPoolFactory); } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java index c87f99f4e932ff..5bbb26a32944a2 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java @@ -19,7 +19,6 @@ package org.apache.flink.runtime.io.network.partition.consumer; import org.apache.flink.core.memory.MemorySegmentFactory; -import org.apache.flink.metrics.SimpleCounter; import org.apache.flink.metrics.groups.UnregisteredMetricsGroup; import org.apache.flink.runtime.deployment.InputChannelDeploymentDescriptor; import org.apache.flink.runtime.deployment.InputGateDeploymentDescriptor; @@ -347,8 +346,7 @@ public void testRequestBackoffConfiguration() throws Exception { "TestTask", gateDesc, SingleInputGateBuilder.NO_OP_PRODUCER_CHECKER, - InputChannelTestUtils.newUnregisteredInputChannelMetrics(), - new SimpleCounter()); + InputChannelTestUtils.newUnregisteredInputChannelMetrics()); try { assertEquals(gateDesc.getConsumedPartitionType(), gate.getConsumedPartitionType()); @@ -599,8 +597,7 @@ private static Map createInputGateWithLocalChannel Arrays.asList(gateDescs), new UnregisteredMetricsGroup(), new UnregisteredMetricsGroup(), - new UnregisteredMetricsGroup(), - new SimpleCounter()); + new UnregisteredMetricsGroup()); Map inputGatesById = new HashMap<>(); for (int i = 0; i < numberOfGates; i++) { inputGatesById.put(new InputGateID(ids[i], consumerID), gates[i]); diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BufferSpiller.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BufferSpiller.java index 700043092dfc42..8350e254be74c5 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BufferSpiller.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BufferSpiller.java @@ -374,7 +374,7 @@ public BufferOrEvent getNext() throws IOException { Buffer buf = new NetworkBuffer(seg, FreeingBufferRecycler.INSTANCE); buf.setSize(length); - return new BufferOrEvent(buf, channel); + return new BufferOrEvent(buf, channel, true, length); } else { // deserialize event @@ -399,7 +399,7 @@ public BufferOrEvent getNext() throws IOException { AbstractEvent evt = EventSerializer.fromSerializedEvent(buffer, getClass().getClassLoader()); buffer.limit(oldLimit); - return new BufferOrEvent(evt, channel); + return new BufferOrEvent(evt, channel, true, length); } } diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/StreamNetworkBenchmarkEnvironment.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/StreamNetworkBenchmarkEnvironment.java index 28e1b914a505f1..5ae8bd23d63e37 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/StreamNetworkBenchmarkEnvironment.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/StreamNetworkBenchmarkEnvironment.java @@ -47,6 +47,7 @@ import org.apache.flink.runtime.io.network.partition.consumer.SingleInputGateFactory; import org.apache.flink.runtime.io.network.partition.consumer.UnionInputGate; import org.apache.flink.runtime.jobgraph.IntermediateDataSetID; +import org.apache.flink.runtime.taskmanager.InputGateWithMetrics; import org.apache.flink.runtime.taskmanager.NetworkEnvironmentConfiguration; import org.apache.flink.runtime.taskmanager.TaskManagerLocation; import org.apache.flink.runtime.util.ConfigurationParserUtils; @@ -235,12 +236,7 @@ private InputGate createInputGate(TaskManagerLocation senderLocation) throws IOE senderLocation, channel); - final SingleInputGate gate = gateFactory.create( - "receiving task[" + channel + "]", - gateDescriptor, - SingleInputGateBuilder.NO_OP_PRODUCER_CHECKER, - InputChannelTestUtils.newUnregisteredInputChannelMetrics(), - new SimpleCounter()); + final InputGate gate = createInputGateWithMetrics(gateFactory, gateDescriptor, channel); gate.setup(); gates[channel] = gate; @@ -270,4 +266,18 @@ private InputGateDeploymentDescriptor createInputGateDeploymentDescriptor( consumedSubpartitionIndex, channelDescriptors); } + + private InputGate createInputGateWithMetrics( + SingleInputGateFactory gateFactory, + InputGateDeploymentDescriptor gateDescriptor, + int channelIndex) { + + final SingleInputGate singleGate = gateFactory.create( + "receiving task[" + channelIndex + "]", + gateDescriptor, + SingleInputGateBuilder.NO_OP_PRODUCER_CHECKER, + InputChannelTestUtils.newUnregisteredInputChannelMetrics()); + + return new InputGateWithMetrics(singleGate, new SimpleCounter()); + } }