From 92bdba5eb06e187a2dc0c2b4c2e26ce568feab71 Mon Sep 17 00:00:00 2001 From: Zhijiang Date: Thu, 30 May 2019 23:15:27 +0800 Subject: [PATCH] [FLINK-12201][network,metrics] Introduce InputGateWithMetrics in Task to increment numBytesIn metric Incrementing of numBytesIn metric in SingleInputGate does not depend on shuffle service and can be moved out of network internals into Task. Task could wrap InputGate provided by ShuffleService with InputGateWithMetrics which would increment numBytesIn metric. --- .../io/network/NetworkEnvironment.java | 7 +- .../partition/consumer/BufferOrEvent.java | 23 +++- .../partition/consumer/SingleInputGate.java | 12 +- .../consumer/SingleInputGateFactory.java | 5 +- .../taskmanager/InputGateWithMetrics.java | 106 ++++++++++++++++++ .../flink/runtime/taskmanager/Task.java | 13 ++- .../partition/InputGateFairnessTest.java | 2 - .../consumer/SingleInputGateBuilder.java | 5 - .../consumer/SingleInputGateTest.java | 7 +- .../streaming/runtime/io/BufferSpiller.java | 4 +- .../StreamNetworkBenchmarkEnvironment.java | 22 +++- 11 files changed, 158 insertions(+), 48 deletions(-) create mode 100644 flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/InputGateWithMetrics.java diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java index 43969e2c14369c..6e5eb2a84a6d35 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java @@ -20,7 +20,6 @@ import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.api.common.JobID; -import org.apache.flink.metrics.Counter; import org.apache.flink.metrics.Gauge; import org.apache.flink.metrics.MetricGroup; import org.apache.flink.runtime.deployment.InputGateDeploymentDescriptor; @@ -252,8 +251,7 @@ public SingleInputGate[] createInputGates( Collection inputGateDeploymentDescriptors, MetricGroup parentGroup, MetricGroup inputGroup, - MetricGroup buffersGroup, - Counter numBytesInCounter) { + MetricGroup buffersGroup) { synchronized (lock) { Preconditions.checkState(!isShutdown, "The NetworkEnvironment has already been shut down."); @@ -265,8 +263,7 @@ public SingleInputGate[] createInputGates( taskName, igdd, partitionProducerStateProvider, - inputChannelMetrics, - numBytesInCounter); + inputChannelMetrics); InputGateID id = new InputGateID(igdd.getConsumedResultId(), executionId); inputGatesById.put(id, inputGate); inputGate.getCloseFuture().thenRun(() -> inputGatesById.remove(id)); diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/BufferOrEvent.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/BufferOrEvent.java index d1da4388c1b2be..4c6a2238a257d9 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/BufferOrEvent.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/BufferOrEvent.java @@ -18,6 +18,7 @@ package org.apache.flink.runtime.io.network.partition.consumer; +import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.runtime.event.AbstractEvent; import org.apache.flink.runtime.io.network.buffer.Buffer; @@ -43,26 +44,32 @@ public class BufferOrEvent { private int channelIndex; - BufferOrEvent(Buffer buffer, int channelIndex, boolean moreAvailable) { + private final int size; + + public BufferOrEvent(Buffer buffer, int channelIndex, boolean moreAvailable, int size) { this.buffer = checkNotNull(buffer); this.event = null; this.channelIndex = channelIndex; this.moreAvailable = moreAvailable; + this.size = size; } - BufferOrEvent(AbstractEvent event, int channelIndex, boolean moreAvailable) { + public BufferOrEvent(AbstractEvent event, int channelIndex, boolean moreAvailable, int size) { this.buffer = null; this.event = checkNotNull(event); this.channelIndex = channelIndex; this.moreAvailable = moreAvailable; + this.size = size; } + @VisibleForTesting public BufferOrEvent(Buffer buffer, int channelIndex) { - this(buffer, channelIndex, true); + this(buffer, channelIndex, true, 0); } + @VisibleForTesting public BufferOrEvent(AbstractEvent event, int channelIndex) { - this(event, channelIndex, true); + this(event, channelIndex, true, 0); } public boolean isBuffer() { @@ -96,11 +103,15 @@ boolean moreAvailable() { @Override public String toString() { - return String.format("BufferOrEvent [%s, channelIndex = %d]", - isBuffer() ? buffer : event, channelIndex); + return String.format("BufferOrEvent [%s, channelIndex = %d, size = %d]", + isBuffer() ? buffer : event, channelIndex, size); } public void setMoreAvailable(boolean moreAvailable) { this.moreAvailable = moreAvailable; } + + public int getSize() { + return size; + } } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java index 5e5a722ffd51f2..816866b2853920 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java @@ -19,7 +19,6 @@ package org.apache.flink.runtime.io.network.partition.consumer; import org.apache.flink.annotation.VisibleForTesting; -import org.apache.flink.metrics.Counter; import org.apache.flink.runtime.deployment.InputChannelDeploymentDescriptor; import org.apache.flink.runtime.deployment.ResultPartitionLocation; import org.apache.flink.runtime.event.AbstractEvent; @@ -165,8 +164,6 @@ public class SingleInputGate extends InputGate { /** A timer to retrigger local partition requests. Only initialized if actually needed. */ private Timer retriggerLocalRequestTimer; - private final Counter numBytesIn; - private final SupplierWithException bufferPoolFactory; private final CompletableFuture closeFuture; @@ -178,7 +175,6 @@ public SingleInputGate( int consumedSubpartitionIndex, int numberOfInputChannels, PartitionProducerStateProvider partitionProducerStateProvider, - Counter numBytesIn, boolean isCreditBased, SupplierWithException bufferPoolFactory) { @@ -200,8 +196,6 @@ public SingleInputGate( this.partitionProducerStateProvider = checkNotNull(partitionProducerStateProvider); - this.numBytesIn = checkNotNull(numBytesIn); - this.isCreditBased = isCreditBased; this.closeFuture = new CompletableFuture<>(); @@ -566,9 +560,9 @@ private BufferOrEvent transformToBufferOrEvent( Buffer buffer, boolean moreAvailable, InputChannel currentChannel) throws IOException, InterruptedException { - numBytesIn.inc(buffer.getSizeUnsafe()); + final int size = buffer.getSizeUnsafe(); if (buffer.isBuffer()) { - return new BufferOrEvent(buffer, currentChannel.getChannelIndex(), moreAvailable); + return new BufferOrEvent(buffer, currentChannel.getChannelIndex(), moreAvailable, size); } else { final AbstractEvent event; @@ -596,7 +590,7 @@ private BufferOrEvent transformToBufferOrEvent( currentChannel.releaseAllResources(); } - return new BufferOrEvent(event, currentChannel.getChannelIndex(), moreAvailable); + return new BufferOrEvent(event, currentChannel.getChannelIndex(), moreAvailable, size); } } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateFactory.java index fcc36659edd78f..6caf0174fac706 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateFactory.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateFactory.java @@ -19,7 +19,6 @@ package org.apache.flink.runtime.io.network.partition.consumer; import org.apache.flink.annotation.VisibleForTesting; -import org.apache.flink.metrics.Counter; import org.apache.flink.runtime.deployment.InputChannelDeploymentDescriptor; import org.apache.flink.runtime.deployment.InputGateDeploymentDescriptor; import org.apache.flink.runtime.deployment.ResultPartitionLocation; @@ -99,8 +98,7 @@ public SingleInputGate create( @Nonnull String owningTaskName, @Nonnull InputGateDeploymentDescriptor igdd, @Nonnull PartitionProducerStateProvider partitionProducerStateProvider, - @Nonnull InputChannelMetrics metrics, - @Nonnull Counter numBytesInCounter) { + @Nonnull InputChannelMetrics metrics) { final IntermediateDataSetID consumedResultId = checkNotNull(igdd.getConsumedResultId()); final ResultPartitionType consumedPartitionType = checkNotNull(igdd.getConsumedPartitionType()); @@ -116,7 +114,6 @@ public SingleInputGate create( consumedSubpartitionIndex, icdd.length, partitionProducerStateProvider, - numBytesInCounter, isCreditBased, createBufferPoolFactory(icdd.length, consumedPartitionType)); diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/InputGateWithMetrics.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/InputGateWithMetrics.java new file mode 100644 index 00000000000000..40694f3fb5729f --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/InputGateWithMetrics.java @@ -0,0 +1,106 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.taskmanager; + +import org.apache.flink.metrics.Counter; +import org.apache.flink.runtime.event.TaskEvent; +import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent; +import org.apache.flink.runtime.io.network.partition.consumer.InputGate; +import org.apache.flink.runtime.metrics.groups.TaskIOMetricGroup; + +import java.io.IOException; +import java.util.Optional; +import java.util.concurrent.CompletableFuture; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * This class wraps {@link InputGate} provided by shuffle service and it is mainly + * used for increasing general input metrics from {@link TaskIOMetricGroup}. + */ +public class InputGateWithMetrics extends InputGate { + + private final InputGate inputGate; + + private final Counter numBytesIn; + + public InputGateWithMetrics(InputGate inputGate, Counter numBytesIn) { + this.inputGate = checkNotNull(inputGate); + this.numBytesIn = checkNotNull(numBytesIn); + } + + @Override + public CompletableFuture isAvailable() { + return inputGate.isAvailable(); + } + + @Override + public int getNumberOfInputChannels() { + return inputGate.getNumberOfInputChannels(); + } + + @Override + public String getOwningTaskName() { + return inputGate.getOwningTaskName(); + } + + @Override + public boolean isFinished() { + return inputGate.isFinished(); + } + + @Override + public void setup() throws IOException { + inputGate.setup(); + } + + @Override + public void requestPartitions() throws IOException, InterruptedException { + inputGate.requestPartitions(); + } + + @Override + public Optional getNextBufferOrEvent() throws IOException, InterruptedException { + Optional bufferOrEvent = inputGate.getNextBufferOrEvent(); + bufferOrEvent.ifPresent(b -> numBytesIn.inc(b.getSize())); + return bufferOrEvent; + } + + @Override + public Optional pollNextBufferOrEvent() throws IOException, InterruptedException { + Optional bufferOrEvent = inputGate.pollNextBufferOrEvent(); + bufferOrEvent.ifPresent(b -> numBytesIn.inc(b.getSize())); + return bufferOrEvent; + } + + @Override + public void sendTaskEvent(TaskEvent event) throws IOException { + inputGate.sendTaskEvent(event); + } + + @Override + public int getPageSize() { + return inputGate.getPageSize(); + } + + @Override + public void close() throws Exception { + inputGate.close(); + } +} diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java index 57d8c6bdc20a32..0d124d1f4d45dd 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java @@ -192,7 +192,7 @@ public class Task implements Runnable, TaskActions, PartitionProducerStateProvid private final ResultPartitionWriter[] producedPartitions; - private final SingleInputGate[] inputGates; + private final InputGate[] inputGates; /** Connection to the task manager. */ private final TaskManagerActions taskManagerActions; @@ -380,15 +380,20 @@ public Task( buffersGroup); // consumed intermediate result partitions - this.inputGates = networkEnvironment.createInputGates( + InputGate[] gates = networkEnvironment.createInputGates( taskNameWithSubtaskAndId, executionId, this, inputGateDeploymentDescriptors, metrics.getIOMetricGroup(), inputGroup, - buffersGroup, - metrics.getIOMetricGroup().getNumBytesInCounter()); + buffersGroup); + + this.inputGates = new InputGate[gates.length]; + int counter = 0; + for (InputGate gate : gates) { + inputGates[counter++] = new InputGateWithMetrics(gate, metrics.getIOMetricGroup().getNumBytesInCounter()); + } invokableHasBeenCanceled = new AtomicBoolean(false); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/InputGateFairnessTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/InputGateFairnessTest.java index d670d01743e09f..10221c5e86baf8 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/InputGateFairnessTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/InputGateFairnessTest.java @@ -18,7 +18,6 @@ package org.apache.flink.runtime.io.network.partition; -import org.apache.flink.metrics.SimpleCounter; import org.apache.flink.runtime.io.network.ConnectionManager; import org.apache.flink.runtime.io.network.api.EndOfPartitionEvent; import org.apache.flink.runtime.io.network.api.serialization.EventSerializer; @@ -331,7 +330,6 @@ public FairnessVerifyingInputGate( consumedSubpartitionIndex, numberOfInputChannels, SingleInputGateBuilder.NO_OP_PRODUCER_CHECKER, - new SimpleCounter(), isCreditBased, STUB_BUFFER_POOL_FACTORY); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateBuilder.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateBuilder.java index 51eba307403441..8c04d5f60b85dc 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateBuilder.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateBuilder.java @@ -18,8 +18,6 @@ package org.apache.flink.runtime.io.network.partition.consumer; -import org.apache.flink.metrics.Counter; -import org.apache.flink.metrics.SimpleCounter; import org.apache.flink.runtime.io.network.NetworkEnvironment; import org.apache.flink.runtime.io.network.buffer.BufferPool; import org.apache.flink.runtime.io.network.partition.PartitionProducerStateProvider; @@ -51,8 +49,6 @@ public class SingleInputGateBuilder { private final PartitionProducerStateProvider partitionProducerStateProvider = NO_OP_PRODUCER_CHECKER; - private final Counter numBytesInCounter = new SimpleCounter(); - private boolean isCreditBased = true; private SupplierWithException bufferPoolFactory = () -> { @@ -99,7 +95,6 @@ public SingleInputGate build() { consumedSubpartitionIndex, numberOfChannels, partitionProducerStateProvider, - numBytesInCounter, isCreditBased, bufferPoolFactory); } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java index c87f99f4e932ff..5bbb26a32944a2 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java @@ -19,7 +19,6 @@ package org.apache.flink.runtime.io.network.partition.consumer; import org.apache.flink.core.memory.MemorySegmentFactory; -import org.apache.flink.metrics.SimpleCounter; import org.apache.flink.metrics.groups.UnregisteredMetricsGroup; import org.apache.flink.runtime.deployment.InputChannelDeploymentDescriptor; import org.apache.flink.runtime.deployment.InputGateDeploymentDescriptor; @@ -347,8 +346,7 @@ public void testRequestBackoffConfiguration() throws Exception { "TestTask", gateDesc, SingleInputGateBuilder.NO_OP_PRODUCER_CHECKER, - InputChannelTestUtils.newUnregisteredInputChannelMetrics(), - new SimpleCounter()); + InputChannelTestUtils.newUnregisteredInputChannelMetrics()); try { assertEquals(gateDesc.getConsumedPartitionType(), gate.getConsumedPartitionType()); @@ -599,8 +597,7 @@ private static Map createInputGateWithLocalChannel Arrays.asList(gateDescs), new UnregisteredMetricsGroup(), new UnregisteredMetricsGroup(), - new UnregisteredMetricsGroup(), - new SimpleCounter()); + new UnregisteredMetricsGroup()); Map inputGatesById = new HashMap<>(); for (int i = 0; i < numberOfGates; i++) { inputGatesById.put(new InputGateID(ids[i], consumerID), gates[i]); diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BufferSpiller.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BufferSpiller.java index 700043092dfc42..8350e254be74c5 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BufferSpiller.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BufferSpiller.java @@ -374,7 +374,7 @@ public BufferOrEvent getNext() throws IOException { Buffer buf = new NetworkBuffer(seg, FreeingBufferRecycler.INSTANCE); buf.setSize(length); - return new BufferOrEvent(buf, channel); + return new BufferOrEvent(buf, channel, true, length); } else { // deserialize event @@ -399,7 +399,7 @@ public BufferOrEvent getNext() throws IOException { AbstractEvent evt = EventSerializer.fromSerializedEvent(buffer, getClass().getClassLoader()); buffer.limit(oldLimit); - return new BufferOrEvent(evt, channel); + return new BufferOrEvent(evt, channel, true, length); } } diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/StreamNetworkBenchmarkEnvironment.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/StreamNetworkBenchmarkEnvironment.java index 28e1b914a505f1..5ae8bd23d63e37 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/StreamNetworkBenchmarkEnvironment.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/StreamNetworkBenchmarkEnvironment.java @@ -47,6 +47,7 @@ import org.apache.flink.runtime.io.network.partition.consumer.SingleInputGateFactory; import org.apache.flink.runtime.io.network.partition.consumer.UnionInputGate; import org.apache.flink.runtime.jobgraph.IntermediateDataSetID; +import org.apache.flink.runtime.taskmanager.InputGateWithMetrics; import org.apache.flink.runtime.taskmanager.NetworkEnvironmentConfiguration; import org.apache.flink.runtime.taskmanager.TaskManagerLocation; import org.apache.flink.runtime.util.ConfigurationParserUtils; @@ -235,12 +236,7 @@ private InputGate createInputGate(TaskManagerLocation senderLocation) throws IOE senderLocation, channel); - final SingleInputGate gate = gateFactory.create( - "receiving task[" + channel + "]", - gateDescriptor, - SingleInputGateBuilder.NO_OP_PRODUCER_CHECKER, - InputChannelTestUtils.newUnregisteredInputChannelMetrics(), - new SimpleCounter()); + final InputGate gate = createInputGateWithMetrics(gateFactory, gateDescriptor, channel); gate.setup(); gates[channel] = gate; @@ -270,4 +266,18 @@ private InputGateDeploymentDescriptor createInputGateDeploymentDescriptor( consumedSubpartitionIndex, channelDescriptors); } + + private InputGate createInputGateWithMetrics( + SingleInputGateFactory gateFactory, + InputGateDeploymentDescriptor gateDescriptor, + int channelIndex) { + + final SingleInputGate singleGate = gateFactory.create( + "receiving task[" + channelIndex + "]", + gateDescriptor, + SingleInputGateBuilder.NO_OP_PRODUCER_CHECKER, + InputChannelTestUtils.newUnregisteredInputChannelMetrics()); + + return new InputGateWithMetrics(singleGate, new SimpleCounter()); + } }